/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.
ByteBufAllocator;
import io.netty.util.
Attribute;
import io.netty.util.
AttributeKey;
import io.netty.util.
DefaultAttributeMap;
import io.netty.util.
Recycler;
import io.netty.util.
ReferenceCountUtil;
import io.netty.util.
ResourceLeakHint;
import io.netty.util.concurrent.
EventExecutor;
import io.netty.util.concurrent.
OrderedEventExecutor;
import io.netty.util.internal.
PromiseNotificationUtil;
import io.netty.util.internal.
ThrowableUtil;
import io.netty.util.internal.
ObjectUtil;
import io.netty.util.internal.
StringUtil;
import io.netty.util.internal.
SystemPropertyUtil;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.net.
SocketAddress;
import java.util.concurrent.atomic.
AtomicIntegerFieldUpdater;
abstract class
AbstractChannelHandlerContext extends
DefaultAttributeMap
implements
ChannelHandlerContext,
ResourceLeakHint {
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
AbstractChannelHandlerContext.class);
volatile
AbstractChannelHandlerContext next;
volatile
AbstractChannelHandlerContext prev;
private static final
AtomicIntegerFieldUpdater<
AbstractChannelHandlerContext>
HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.
newUpdater(
AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int
ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int
ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int
REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int
INIT = 0;
private final boolean
inbound;
private final boolean
outbound;
private final
DefaultChannelPipeline pipeline;
private final
String name;
private final boolean
ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final
EventExecutor executor;
private
ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private
Runnable invokeChannelReadCompleteTask;
private
Runnable invokeReadTask;
private
Runnable invokeChannelWritableStateChangedTask;
private
Runnable invokeFlushTask;
private volatile int
handlerState =
INIT;
AbstractChannelHandlerContext(
DefaultChannelPipeline pipeline,
EventExecutor executor,
String name,
boolean
inbound, boolean
outbound) {
this.
name =
ObjectUtil.
checkNotNull(
name, "name");
this.
pipeline =
pipeline;
this.
executor =
executor;
this.
inbound =
inbound;
this.
outbound =
outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered =
executor == null ||
executor instanceof
OrderedEventExecutor;
}
@
Override
public
Channel channel() {
return
pipeline.
channel();
}
@
Override
public
ChannelPipeline pipeline() {
return
pipeline;
}
@
Override
public
ByteBufAllocator alloc() {
return
channel().
config().
getAllocator();
}
@
Override
public
EventExecutor executor() {
if (
executor == null) {
return
channel().
eventLoop();
} else {
return
executor;
}
}
@
Override
public
String name() {
return
name;
}
@
Override
public
ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(
findContextInbound());
return this;
}
static void
invokeChannelRegistered(final
AbstractChannelHandlerContext next) {
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelRegistered();
} else {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelRegistered();
}
});
}
}
private void
invokeChannelRegistered() {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelRegistered(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelRegistered();
}
}
@
Override
public
ChannelHandlerContext fireChannelUnregistered() {
invokeChannelUnregistered(
findContextInbound());
return this;
}
static void
invokeChannelUnregistered(final
AbstractChannelHandlerContext next) {
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelUnregistered();
} else {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelUnregistered();
}
});
}
}
private void
invokeChannelUnregistered() {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelUnregistered(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelUnregistered();
}
}
@
Override
public
ChannelHandlerContext fireChannelActive() {
invokeChannelActive(
findContextInbound());
return this;
}
static void
invokeChannelActive(final
AbstractChannelHandlerContext next) {
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelActive();
} else {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelActive();
}
});
}
}
private void
invokeChannelActive() {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelActive(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelActive();
}
}
@
Override
public
ChannelHandlerContext fireChannelInactive() {
invokeChannelInactive(
findContextInbound());
return this;
}
static void
invokeChannelInactive(final
AbstractChannelHandlerContext next) {
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelInactive();
} else {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelInactive();
}
});
}
}
private void
invokeChannelInactive() {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelInactive(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelInactive();
}
}
@
Override
public
ChannelHandlerContext fireExceptionCaught(final
Throwable cause) {
invokeExceptionCaught(
next,
cause);
return this;
}
static void
invokeExceptionCaught(final
AbstractChannelHandlerContext next, final
Throwable cause) {
ObjectUtil.
checkNotNull(
cause, "cause");
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeExceptionCaught(
cause);
} else {
try {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeExceptionCaught(
cause);
}
});
} catch (
Throwable t) {
if (
logger.
isWarnEnabled()) {
logger.
warn("Failed to submit an exceptionCaught() event.",
t);
logger.
warn("The exceptionCaught() event that was failed to submit was:",
cause);
}
}
}
}
private void
invokeExceptionCaught(final
Throwable cause) {
if (
invokeHandler()) {
try {
handler().
exceptionCaught(this,
cause);
} catch (
Throwable error) {
if (
logger.
isDebugEnabled()) {
logger.
debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.
stackTraceToString(
error),
cause);
} else if (
logger.
isWarnEnabled()) {
logger.
warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
error,
cause);
}
}
} else {
fireExceptionCaught(
cause);
}
}
@
Override
public
ChannelHandlerContext fireUserEventTriggered(final
Object event) {
invokeUserEventTriggered(
findContextInbound(),
event);
return this;
}
static void
invokeUserEventTriggered(final
AbstractChannelHandlerContext next, final
Object event) {
ObjectUtil.
checkNotNull(
event, "event");
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeUserEventTriggered(
event);
} else {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeUserEventTriggered(
event);
}
});
}
}
private void
invokeUserEventTriggered(
Object event) {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
userEventTriggered(this,
event);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireUserEventTriggered(
event);
}
}
@
Override
public
ChannelHandlerContext fireChannelRead(final
Object msg) {
invokeChannelRead(
findContextInbound(),
msg);
return this;
}
static void
invokeChannelRead(final
AbstractChannelHandlerContext next,
Object msg) {
final
Object m =
next.
pipeline.
touch(
ObjectUtil.
checkNotNull(
msg, "msg"),
next);
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelRead(
m);
} else {
executor.
execute(new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelRead(
m);
}
});
}
}
private void
invokeChannelRead(
Object msg) {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelRead(this,
msg);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelRead(
msg);
}
}
@
Override
public
ChannelHandlerContext fireChannelReadComplete() {
invokeChannelReadComplete(
findContextInbound());
return this;
}
static void
invokeChannelReadComplete(final
AbstractChannelHandlerContext next) {
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelReadComplete();
} else {
Runnable task =
next.
invokeChannelReadCompleteTask;
if (
task == null) {
next.
invokeChannelReadCompleteTask =
task = new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelReadComplete();
}
};
}
executor.
execute(
task);
}
}
private void
invokeChannelReadComplete() {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelReadComplete(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelReadComplete();
}
}
@
Override
public
ChannelHandlerContext fireChannelWritabilityChanged() {
invokeChannelWritabilityChanged(
findContextInbound());
return this;
}
static void
invokeChannelWritabilityChanged(final
AbstractChannelHandlerContext next) {
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeChannelWritabilityChanged();
} else {
Runnable task =
next.
invokeChannelWritableStateChangedTask;
if (
task == null) {
next.
invokeChannelWritableStateChangedTask =
task = new
Runnable() {
@
Override
public void
run() {
next.
invokeChannelWritabilityChanged();
}
};
}
executor.
execute(
task);
}
}
private void
invokeChannelWritabilityChanged() {
if (
invokeHandler()) {
try {
((
ChannelInboundHandler)
handler()).
channelWritabilityChanged(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
fireChannelWritabilityChanged();
}
}
@
Override
public
ChannelFuture bind(
SocketAddress localAddress) {
return
bind(
localAddress,
newPromise());
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress) {
return
connect(
remoteAddress,
newPromise());
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress,
SocketAddress localAddress) {
return
connect(
remoteAddress,
localAddress,
newPromise());
}
@
Override
public
ChannelFuture disconnect() {
return
disconnect(
newPromise());
}
@
Override
public
ChannelFuture close() {
return
close(
newPromise());
}
@
Override
public
ChannelFuture deregister() {
return
deregister(
newPromise());
}
@
Override
public
ChannelFuture bind(final
SocketAddress localAddress, final
ChannelPromise promise) {
if (
localAddress == null) {
throw new
NullPointerException("localAddress");
}
if (
isNotValidPromise(
promise, false)) {
// cancelled
return
promise;
}
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeBind(
localAddress,
promise);
} else {
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
next.
invokeBind(
localAddress,
promise);
}
},
promise, null);
}
return
promise;
}
private void
invokeBind(
SocketAddress localAddress,
ChannelPromise promise) {
if (
invokeHandler()) {
try {
((
ChannelOutboundHandler)
handler()).
bind(this,
localAddress,
promise);
} catch (
Throwable t) {
notifyOutboundHandlerException(
t,
promise);
}
} else {
bind(
localAddress,
promise);
}
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress,
ChannelPromise promise) {
return
connect(
remoteAddress, null,
promise);
}
@
Override
public
ChannelFuture connect(
final
SocketAddress remoteAddress, final
SocketAddress localAddress, final
ChannelPromise promise) {
if (
remoteAddress == null) {
throw new
NullPointerException("remoteAddress");
}
if (
isNotValidPromise(
promise, false)) {
// cancelled
return
promise;
}
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeConnect(
remoteAddress,
localAddress,
promise);
} else {
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
next.
invokeConnect(
remoteAddress,
localAddress,
promise);
}
},
promise, null);
}
return
promise;
}
private void
invokeConnect(
SocketAddress remoteAddress,
SocketAddress localAddress,
ChannelPromise promise) {
if (
invokeHandler()) {
try {
((
ChannelOutboundHandler)
handler()).
connect(this,
remoteAddress,
localAddress,
promise);
} catch (
Throwable t) {
notifyOutboundHandlerException(
t,
promise);
}
} else {
connect(
remoteAddress,
localAddress,
promise);
}
}
@
Override
public
ChannelFuture disconnect(final
ChannelPromise promise) {
if (
isNotValidPromise(
promise, false)) {
// cancelled
return
promise;
}
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!
channel().
metadata().
hasDisconnect()) {
next.
invokeClose(
promise);
} else {
next.
invokeDisconnect(
promise);
}
} else {
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
if (!
channel().
metadata().
hasDisconnect()) {
next.
invokeClose(
promise);
} else {
next.
invokeDisconnect(
promise);
}
}
},
promise, null);
}
return
promise;
}
private void
invokeDisconnect(
ChannelPromise promise) {
if (
invokeHandler()) {
try {
((
ChannelOutboundHandler)
handler()).
disconnect(this,
promise);
} catch (
Throwable t) {
notifyOutboundHandlerException(
t,
promise);
}
} else {
disconnect(
promise);
}
}
@
Override
public
ChannelFuture close(final
ChannelPromise promise) {
if (
isNotValidPromise(
promise, false)) {
// cancelled
return
promise;
}
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeClose(
promise);
} else {
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
next.
invokeClose(
promise);
}
},
promise, null);
}
return
promise;
}
private void
invokeClose(
ChannelPromise promise) {
if (
invokeHandler()) {
try {
((
ChannelOutboundHandler)
handler()).
close(this,
promise);
} catch (
Throwable t) {
notifyOutboundHandlerException(
t,
promise);
}
} else {
close(
promise);
}
}
@
Override
public
ChannelFuture deregister(final
ChannelPromise promise) {
if (
isNotValidPromise(
promise, false)) {
// cancelled
return
promise;
}
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeDeregister(
promise);
} else {
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
next.
invokeDeregister(
promise);
}
},
promise, null);
}
return
promise;
}
private void
invokeDeregister(
ChannelPromise promise) {
if (
invokeHandler()) {
try {
((
ChannelOutboundHandler)
handler()).
deregister(this,
promise);
} catch (
Throwable t) {
notifyOutboundHandlerException(
t,
promise);
}
} else {
deregister(
promise);
}
}
@
Override
public
ChannelHandlerContext read() {
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeRead();
} else {
Runnable task =
next.
invokeReadTask;
if (
task == null) {
next.
invokeReadTask =
task = new
Runnable() {
@
Override
public void
run() {
next.
invokeRead();
}
};
}
executor.
execute(
task);
}
return this;
}
private void
invokeRead() {
if (
invokeHandler()) {
try {
((
ChannelOutboundHandler)
handler()).
read(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
} else {
read();
}
}
@
Override
public
ChannelFuture write(
Object msg) {
return
write(
msg,
newPromise());
}
@
Override
public
ChannelFuture write(final
Object msg, final
ChannelPromise promise) {
if (
msg == null) {
throw new
NullPointerException("msg");
}
try {
if (
isNotValidPromise(
promise, true)) {
ReferenceCountUtil.
release(
msg);
// cancelled
return
promise;
}
} catch (
RuntimeException e) {
ReferenceCountUtil.
release(
msg);
throw
e;
}
write(
msg, false,
promise);
return
promise;
}
private void
invokeWrite(
Object msg,
ChannelPromise promise) {
if (
invokeHandler()) {
invokeWrite0(
msg,
promise);
} else {
write(
msg,
promise);
}
}
private void
invokeWrite0(
Object msg,
ChannelPromise promise) {
try {
((
ChannelOutboundHandler)
handler()).
write(this,
msg,
promise);
} catch (
Throwable t) {
notifyOutboundHandlerException(
t,
promise);
}
}
@
Override
public
ChannelHandlerContext flush() {
final
AbstractChannelHandlerContext next =
findContextOutbound();
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
next.
invokeFlush();
} else {
Runnable task =
next.
invokeFlushTask;
if (
task == null) {
next.
invokeFlushTask =
task = new
Runnable() {
@
Override
public void
run() {
next.
invokeFlush();
}
};
}
safeExecute(
executor,
task,
channel().
voidPromise(), null);
}
return this;
}
private void
invokeFlush() {
if (
invokeHandler()) {
invokeFlush0();
} else {
flush();
}
}
private void
invokeFlush0() {
try {
((
ChannelOutboundHandler)
handler()).
flush(this);
} catch (
Throwable t) {
notifyHandlerException(
t);
}
}
@
Override
public
ChannelFuture writeAndFlush(
Object msg,
ChannelPromise promise) {
if (
msg == null) {
throw new
NullPointerException("msg");
}
if (
isNotValidPromise(
promise, true)) {
ReferenceCountUtil.
release(
msg);
// cancelled
return
promise;
}
write(
msg, true,
promise);
return
promise;
}
private void
invokeWriteAndFlush(
Object msg,
ChannelPromise promise) {
if (
invokeHandler()) {
invokeWrite0(
msg,
promise);
invokeFlush0();
} else {
writeAndFlush(
msg,
promise);
}
}
private void
write(
Object msg, boolean
flush,
ChannelPromise promise) {
AbstractChannelHandlerContext next =
findContextOutbound();
final
Object m =
pipeline.
touch(
msg,
next);
EventExecutor executor =
next.
executor();
if (
executor.
inEventLoop()) {
if (
flush) {
next.
invokeWriteAndFlush(
m,
promise);
} else {
next.
invokeWrite(
m,
promise);
}
} else {
AbstractWriteTask task;
if (
flush) {
task =
WriteAndFlushTask.
newInstance(
next,
m,
promise);
} else {
task =
WriteTask.
newInstance(
next,
m,
promise);
}
safeExecute(
executor,
task,
promise,
m);
}
}
@
Override
public
ChannelFuture writeAndFlush(
Object msg) {
return
writeAndFlush(
msg,
newPromise());
}
private static void
notifyOutboundHandlerException(
Throwable cause,
ChannelPromise promise) {
// Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
// false.
PromiseNotificationUtil.
tryFailure(
promise,
cause,
promise instanceof
VoidChannelPromise ? null :
logger);
}
private void
notifyHandlerException(
Throwable cause) {
if (
inExceptionCaught(
cause)) {
if (
logger.
isWarnEnabled()) {
logger.
warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event",
cause);
}
return;
}
invokeExceptionCaught(
cause);
}
private static boolean
inExceptionCaught(
Throwable cause) {
do {
StackTraceElement[]
trace =
cause.
getStackTrace();
if (
trace != null) {
for (
StackTraceElement t :
trace) {
if (
t == null) {
break;
}
if ("exceptionCaught".
equals(
t.
getMethodName())) {
return true;
}
}
}
cause =
cause.
getCause();
} while (
cause != null);
return false;
}
@
Override
public
ChannelPromise newPromise() {
return new
DefaultChannelPromise(
channel(),
executor());
}
@
Override
public
ChannelProgressivePromise newProgressivePromise() {
return new
DefaultChannelProgressivePromise(
channel(),
executor());
}
@
Override
public
ChannelFuture newSucceededFuture() {
ChannelFuture succeededFuture = this.
succeededFuture;
if (
succeededFuture == null) {
this.
succeededFuture =
succeededFuture = new
SucceededChannelFuture(
channel(),
executor());
}
return
succeededFuture;
}
@
Override
public
ChannelFuture newFailedFuture(
Throwable cause) {
return new
FailedChannelFuture(
channel(),
executor(),
cause);
}
private boolean
isNotValidPromise(
ChannelPromise promise, boolean
allowVoidPromise) {
if (
promise == null) {
throw new
NullPointerException("promise");
}
if (
promise.
isDone()) {
// Check if the promise was cancelled and if so signal that the processing of the operation
// should not be performed.
//
// See https://github.com/netty/netty/issues/2349
if (
promise.
isCancelled()) {
return true;
}
throw new
IllegalArgumentException("promise already done: " +
promise);
}
if (
promise.
channel() !=
channel()) {
throw new
IllegalArgumentException(
String.
format(
"promise.channel does not match: %s (expected: %s)",
promise.
channel(),
channel()));
}
if (
promise.
getClass() ==
DefaultChannelPromise.class) {
return false;
}
if (!
allowVoidPromise &&
promise instanceof
VoidChannelPromise) {
throw new
IllegalArgumentException(
StringUtil.
simpleClassName(
VoidChannelPromise.class) + " not allowed for this operation");
}
if (
promise instanceof
AbstractChannel.
CloseFuture) {
throw new
IllegalArgumentException(
StringUtil.
simpleClassName(
AbstractChannel.
CloseFuture.class) + " not allowed in a pipeline");
}
return false;
}
private
AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx =
ctx.
next;
} while (!
ctx.
inbound);
return
ctx;
}
private
AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx =
ctx.
prev;
} while (!
ctx.
outbound);
return
ctx;
}
@
Override
public
ChannelPromise voidPromise() {
return
channel().
voidPromise();
}
final void
setRemoved() {
handlerState =
REMOVE_COMPLETE;
}
final void
setAddComplete() {
for (;;) {
int
oldState =
handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (
oldState ==
REMOVE_COMPLETE ||
HANDLER_STATE_UPDATER.
compareAndSet(this,
oldState,
ADD_COMPLETE)) {
return;
}
}
}
final void
setAddPending() {
boolean
updated =
HANDLER_STATE_UPDATER.
compareAndSet(this,
INIT,
ADD_PENDING);
assert
updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
}
/**
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
* yet. If not return {@code false} and if called or could not detect return {@code true}.
*
* If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
* This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
* but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
*/
private boolean
invokeHandler() {
// Store in local variable to reduce volatile reads.
int
handlerState = this.
handlerState;
return
handlerState ==
ADD_COMPLETE || (!
ordered &&
handlerState ==
ADD_PENDING);
}
@
Override
public boolean
isRemoved() {
return
handlerState ==
REMOVE_COMPLETE;
}
@
Override
public <T>
Attribute<T>
attr(
AttributeKey<T>
key) {
return
channel().
attr(
key);
}
@
Override
public <T> boolean
hasAttr(
AttributeKey<T>
key) {
return
channel().
hasAttr(
key);
}
private static void
safeExecute(
EventExecutor executor,
Runnable runnable,
ChannelPromise promise,
Object msg) {
try {
executor.
execute(
runnable);
} catch (
Throwable cause) {
try {
promise.
setFailure(
cause);
} finally {
if (
msg != null) {
ReferenceCountUtil.
release(
msg);
}
}
}
}
@
Override
public
String toHintString() {
return '\'' +
name + "' will handle the message from this point.";
}
@
Override
public
String toString() {
return
StringUtil.
simpleClassName(
ChannelHandlerContext.class) + '(' +
name + ", " +
channel() + ')';
}
abstract static class
AbstractWriteTask implements
Runnable {
private static final boolean
ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.
getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int
WRITE_TASK_OVERHEAD =
SystemPropertyUtil.
getInt("io.netty.transport.writeTaskSizeOverhead", 48);
private final
Recycler.
Handle<
AbstractWriteTask>
handle;
private
AbstractChannelHandlerContext ctx;
private
Object msg;
private
ChannelPromise promise;
private int
size;
@
SuppressWarnings("unchecked")
private
AbstractWriteTask(
Recycler.
Handle<? extends
AbstractWriteTask>
handle) {
this.
handle = (
Recycler.
Handle<
AbstractWriteTask>)
handle;
}
protected static void
init(
AbstractWriteTask task,
AbstractChannelHandlerContext ctx,
Object msg,
ChannelPromise promise) {
task.
ctx =
ctx;
task.
msg =
msg;
task.
promise =
promise;
if (
ESTIMATE_TASK_SIZE_ON_SUBMIT) {
task.
size =
ctx.
pipeline.
estimatorHandle().
size(
msg) +
WRITE_TASK_OVERHEAD;
ctx.
pipeline.
incrementPendingOutboundBytes(
task.
size);
} else {
task.
size = 0;
}
}
@
Override
public final void
run() {
try {
// Check for null as it may be set to null if the channel is closed already
if (
ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.
pipeline.
decrementPendingOutboundBytes(
size);
}
write(
ctx,
msg,
promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
handle.
recycle(this);
}
}
protected void
write(
AbstractChannelHandlerContext ctx,
Object msg,
ChannelPromise promise) {
ctx.
invokeWrite(
msg,
promise);
}
}
static final class
WriteTask extends
AbstractWriteTask implements
SingleThreadEventLoop.
NonWakeupRunnable {
private static final
Recycler<
WriteTask>
RECYCLER = new
Recycler<
WriteTask>() {
@
Override
protected
WriteTask newObject(
Handle<
WriteTask>
handle) {
return new
WriteTask(
handle);
}
};
private static
WriteTask newInstance(
AbstractChannelHandlerContext ctx,
Object msg,
ChannelPromise promise) {
WriteTask task =
RECYCLER.
get();
init(
task,
ctx,
msg,
promise);
return
task;
}
private
WriteTask(
Recycler.
Handle<
WriteTask>
handle) {
super(
handle);
}
}
static final class
WriteAndFlushTask extends
AbstractWriteTask {
private static final
Recycler<
WriteAndFlushTask>
RECYCLER = new
Recycler<
WriteAndFlushTask>() {
@
Override
protected
WriteAndFlushTask newObject(
Handle<
WriteAndFlushTask>
handle) {
return new
WriteAndFlushTask(
handle);
}
};
private static
WriteAndFlushTask newInstance(
AbstractChannelHandlerContext ctx,
Object msg,
ChannelPromise promise) {
WriteAndFlushTask task =
RECYCLER.
get();
init(
task,
ctx,
msg,
promise);
return
task;
}
private
WriteAndFlushTask(
Recycler.
Handle<
WriteAndFlushTask>
handle) {
super(
handle);
}
@
Override
public void
write(
AbstractChannelHandlerContext ctx,
Object msg,
ChannelPromise promise) {
super.write(
ctx,
msg,
promise);
ctx.
invokeFlush();
}
}
}