/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.
Collections;
import java.util.
Iterator;
import java.util.
LinkedHashSet;
import java.util.
Set;
import java.util.concurrent.
Executor;
import java.util.concurrent.
ThreadFactory;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicInteger;
/**
* Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
* the same time.
*/
public abstract class
MultithreadEventExecutorGroup extends
AbstractEventExecutorGroup {
private final
EventExecutor[]
children;
private final
Set<
EventExecutor>
readonlyChildren;
private final
AtomicInteger terminatedChildren = new
AtomicInteger();
private final
Promise<?>
terminationFuture = new
DefaultPromise(
GlobalEventExecutor.
INSTANCE);
private final
EventExecutorChooserFactory.
EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected
MultithreadEventExecutorGroup(int
nThreads,
ThreadFactory threadFactory,
Object...
args) {
this(
nThreads,
threadFactory == null ? null : new
ThreadPerTaskExecutor(
threadFactory),
args);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected
MultithreadEventExecutorGroup(int
nThreads,
Executor executor,
Object...
args) {
this(
nThreads,
executor,
DefaultEventExecutorChooserFactory.
INSTANCE,
args);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected
MultithreadEventExecutorGroup(int
nThreads,
Executor executor,
EventExecutorChooserFactory chooserFactory,
Object...
args) {
if (
nThreads <= 0) {
throw new
IllegalArgumentException(
String.
format("nThreads: %d (expected: > 0)",
nThreads));
}
if (
executor == null) {
executor = new
ThreadPerTaskExecutor(
newDefaultThreadFactory());
}
children = new
EventExecutor[
nThreads];
for (int
i = 0;
i <
nThreads;
i ++) {
boolean
success = false;
try {
children[
i] =
newChild(
executor,
args);
success = true;
} catch (
Exception e) {
// TODO: Think about if this is a good exception type
throw new
IllegalStateException("failed to create a child event loop",
e);
} finally {
if (!
success) {
for (int
j = 0;
j <
i;
j ++) {
children[
j].
shutdownGracefully();
}
for (int
j = 0;
j <
i;
j ++) {
EventExecutor e =
children[
j];
try {
while (!
e.
isTerminated()) {
e.
awaitTermination(
Integer.
MAX_VALUE,
TimeUnit.
SECONDS);
}
} catch (
InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.
currentThread().
interrupt();
break;
}
}
}
}
}
chooser =
chooserFactory.
newChooser(
children);
final
FutureListener<
Object>
terminationListener = new
FutureListener<
Object>() {
@
Override
public void
operationComplete(
Future<
Object>
future) throws
Exception {
if (
terminatedChildren.
incrementAndGet() ==
children.length) {
terminationFuture.
setSuccess(null);
}
}
};
for (
EventExecutor e:
children) {
e.
terminationFuture().
addListener(
terminationListener);
}
Set<
EventExecutor>
childrenSet = new
LinkedHashSet<
EventExecutor>(
children.length);
Collections.
addAll(
childrenSet,
children);
readonlyChildren =
Collections.
unmodifiableSet(
childrenSet);
}
protected
ThreadFactory newDefaultThreadFactory() {
return new
DefaultThreadFactory(
getClass());
}
@
Override
public
EventExecutor next() {
return
chooser.
next();
}
@
Override
public
Iterator<
EventExecutor>
iterator() {
return
readonlyChildren.
iterator();
}
/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps
* 1:1 to the threads it use.
*/
public final int
executorCount() {
return
children.length;
}
/**
* Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be
* called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
*/
protected abstract
EventExecutor newChild(
Executor executor,
Object...
args) throws
Exception;
@
Override
public
Future<?>
shutdownGracefully(long
quietPeriod, long
timeout,
TimeUnit unit) {
for (
EventExecutor l:
children) {
l.
shutdownGracefully(
quietPeriod,
timeout,
unit);
}
return
terminationFuture();
}
@
Override
public
Future<?>
terminationFuture() {
return
terminationFuture;
}
@
Override
@
Deprecated
public void
shutdown() {
for (
EventExecutor l:
children) {
l.
shutdown();
}
}
@
Override
public boolean
isShuttingDown() {
for (
EventExecutor l:
children) {
if (!
l.
isShuttingDown()) {
return false;
}
}
return true;
}
@
Override
public boolean
isShutdown() {
for (
EventExecutor l:
children) {
if (!
l.
isShutdown()) {
return false;
}
}
return true;
}
@
Override
public boolean
isTerminated() {
for (
EventExecutor l:
children) {
if (!
l.
isTerminated()) {
return false;
}
}
return true;
}
@
Override
public boolean
awaitTermination(long
timeout,
TimeUnit unit)
throws
InterruptedException {
long
deadline =
System.
nanoTime() +
unit.
toNanos(
timeout);
loop: for (
EventExecutor l:
children) {
for (;;) {
long
timeLeft =
deadline -
System.
nanoTime();
if (
timeLeft <= 0) {
break
loop;
}
if (
l.
awaitTermination(
timeLeft,
TimeUnit.
NANOSECONDS)) {
break;
}
}
}
return
isTerminated();
}
}