/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.
AbstractEventExecutorGroup;
import io.netty.util.concurrent.
DefaultPromise;
import io.netty.util.concurrent.
EventExecutor;
import io.netty.util.concurrent.
Future;
import io.netty.util.concurrent.
FutureListener;
import io.netty.util.concurrent.
GlobalEventExecutor;
import io.netty.util.concurrent.
Promise;
import io.netty.util.concurrent.
ThreadPerTaskExecutor;
import io.netty.util.internal.
EmptyArrays;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
ReadOnlyIterator;
import io.netty.util.internal.
ThrowableUtil;
import java.util.
Collections;
import java.util.
Iterator;
import java.util.
Queue;
import java.util.
Set;
import java.util.concurrent.
ConcurrentLinkedQueue;
import java.util.concurrent.
Executor;
import java.util.concurrent.
Executors;
import java.util.concurrent.
RejectedExecutionException;
import java.util.concurrent.
ThreadFactory;
import java.util.concurrent.
TimeUnit;
/**
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
*/
public class
ThreadPerChannelEventLoopGroup extends
AbstractEventExecutorGroup implements
EventLoopGroup {
private final
Object[]
childArgs;
private final int
maxChannels;
final
Executor executor;
final
Set<
EventLoop>
activeChildren =
Collections.
newSetFromMap(
PlatformDependent.<
EventLoop,
Boolean>
newConcurrentHashMap());
final
Queue<
EventLoop>
idleChildren = new
ConcurrentLinkedQueue<
EventLoop>();
private final
ChannelException tooManyChannels;
private volatile boolean
shuttingDown;
private final
Promise<?>
terminationFuture = new
DefaultPromise<
Void>(
GlobalEventExecutor.
INSTANCE);
private final
FutureListener<
Object>
childTerminationListener = new
FutureListener<
Object>() {
@
Override
public void
operationComplete(
Future<
Object>
future) throws
Exception {
// Inefficient, but works.
if (
isTerminated()) {
terminationFuture.
trySuccess(null);
}
}
};
/**
* Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
*/
protected
ThreadPerChannelEventLoopGroup() {
this(0);
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
protected
ThreadPerChannelEventLoopGroup(int
maxChannels) {
this(
maxChannels,
Executors.
defaultThreadFactory());
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected
ThreadPerChannelEventLoopGroup(int
maxChannels,
ThreadFactory threadFactory,
Object...
args) {
this(
maxChannels, new
ThreadPerTaskExecutor(
threadFactory),
args);
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected
ThreadPerChannelEventLoopGroup(int
maxChannels,
Executor executor,
Object...
args) {
if (
maxChannels < 0) {
throw new
IllegalArgumentException(
String.
format(
"maxChannels: %d (expected: >= 0)",
maxChannels));
}
if (
executor == null) {
throw new
NullPointerException("executor");
}
if (
args == null) {
childArgs =
EmptyArrays.
EMPTY_OBJECTS;
} else {
childArgs =
args.
clone();
}
this.
maxChannels =
maxChannels;
this.
executor =
executor;
tooManyChannels =
ThrowableUtil.
unknownStackTrace(
new
ChannelException("too many channels (max: " +
maxChannels + ')'),
ThreadPerChannelEventLoopGroup.class, "nextChild()");
}
/**
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
*/
protected
EventLoop newChild(@
SuppressWarnings("UnusedParameters")
Object...
args) throws
Exception {
return new
ThreadPerChannelEventLoop(this);
}
@
Override
public
Iterator<
EventExecutor>
iterator() {
return new
ReadOnlyIterator<
EventExecutor>(
activeChildren.
iterator());
}
@
Override
public
EventLoop next() {
throw new
UnsupportedOperationException();
}
@
Override
public
Future<?>
shutdownGracefully(long
quietPeriod, long
timeout,
TimeUnit unit) {
shuttingDown = true;
for (
EventLoop l:
activeChildren) {
l.
shutdownGracefully(
quietPeriod,
timeout,
unit);
}
for (
EventLoop l:
idleChildren) {
l.
shutdownGracefully(
quietPeriod,
timeout,
unit);
}
// Notify the future if there was no children.
if (
isTerminated()) {
terminationFuture.
trySuccess(null);
}
return
terminationFuture();
}
@
Override
public
Future<?>
terminationFuture() {
return
terminationFuture;
}
@
Override
@
Deprecated
public void
shutdown() {
shuttingDown = true;
for (
EventLoop l:
activeChildren) {
l.
shutdown();
}
for (
EventLoop l:
idleChildren) {
l.
shutdown();
}
// Notify the future if there was no children.
if (
isTerminated()) {
terminationFuture.
trySuccess(null);
}
}
@
Override
public boolean
isShuttingDown() {
for (
EventLoop l:
activeChildren) {
if (!
l.
isShuttingDown()) {
return false;
}
}
for (
EventLoop l:
idleChildren) {
if (!
l.
isShuttingDown()) {
return false;
}
}
return true;
}
@
Override
public boolean
isShutdown() {
for (
EventLoop l:
activeChildren) {
if (!
l.
isShutdown()) {
return false;
}
}
for (
EventLoop l:
idleChildren) {
if (!
l.
isShutdown()) {
return false;
}
}
return true;
}
@
Override
public boolean
isTerminated() {
for (
EventLoop l:
activeChildren) {
if (!
l.
isTerminated()) {
return false;
}
}
for (
EventLoop l:
idleChildren) {
if (!
l.
isTerminated()) {
return false;
}
}
return true;
}
@
Override
public boolean
awaitTermination(long
timeout,
TimeUnit unit)
throws
InterruptedException {
long
deadline =
System.
nanoTime() +
unit.
toNanos(
timeout);
for (
EventLoop l:
activeChildren) {
for (;;) {
long
timeLeft =
deadline -
System.
nanoTime();
if (
timeLeft <= 0) {
return
isTerminated();
}
if (
l.
awaitTermination(
timeLeft,
TimeUnit.
NANOSECONDS)) {
break;
}
}
}
for (
EventLoop l:
idleChildren) {
for (;;) {
long
timeLeft =
deadline -
System.
nanoTime();
if (
timeLeft <= 0) {
return
isTerminated();
}
if (
l.
awaitTermination(
timeLeft,
TimeUnit.
NANOSECONDS)) {
break;
}
}
}
return
isTerminated();
}
@
Override
public
ChannelFuture register(
Channel channel) {
if (
channel == null) {
throw new
NullPointerException("channel");
}
try {
EventLoop l =
nextChild();
return
l.
register(new
DefaultChannelPromise(
channel,
l));
} catch (
Throwable t) {
return new
FailedChannelFuture(
channel,
GlobalEventExecutor.
INSTANCE,
t);
}
}
@
Override
public
ChannelFuture register(
ChannelPromise promise) {
try {
return
nextChild().
register(
promise);
} catch (
Throwable t) {
promise.
setFailure(
t);
return
promise;
}
}
@
Deprecated
@
Override
public
ChannelFuture register(
Channel channel,
ChannelPromise promise) {
if (
channel == null) {
throw new
NullPointerException("channel");
}
try {
return
nextChild().
register(
channel,
promise);
} catch (
Throwable t) {
promise.
setFailure(
t);
return
promise;
}
}
private
EventLoop nextChild() throws
Exception {
if (
shuttingDown) {
throw new
RejectedExecutionException("shutting down");
}
EventLoop loop =
idleChildren.
poll();
if (
loop == null) {
if (
maxChannels > 0 &&
activeChildren.
size() >=
maxChannels) {
throw
tooManyChannels;
}
loop =
newChild(
childArgs);
loop.
terminationFuture().
addListener(
childTerminationListener);
}
activeChildren.
add(
loop);
return
loop;
}
}