/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.
ByteBufAllocator;
import io.netty.channel.socket.
ChannelOutputShutdownEvent;
import io.netty.channel.socket.
ChannelOutputShutdownException;
import io.netty.util.
DefaultAttributeMap;
import io.netty.util.
ReferenceCountUtil;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
ThrowableUtil;
import io.netty.util.internal.
UnstableApi;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.io.
IOException;
import java.net.
ConnectException;
import java.net.
InetSocketAddress;
import java.net.
NoRouteToHostException;
import java.net.
SocketAddress;
import java.net.
SocketException;
import java.nio.channels.
ClosedChannelException;
import java.nio.channels.
NotYetConnectedException;
import java.util.concurrent.
Executor;
import java.util.concurrent.
RejectedExecutionException;
/**
* A skeletal {@link Channel} implementation.
*/
public abstract class
AbstractChannel extends
DefaultAttributeMap implements
Channel {
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
AbstractChannel.class);
private static final
ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
ClosedChannelException(),
AbstractUnsafe.class, "flush0()");
private static final
ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
ClosedChannelException(),
AbstractUnsafe.class, "ensureOpen(...)");
private static final
ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
ClosedChannelException(),
AbstractUnsafe.class, "close(...)");
private static final
ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
ClosedChannelException(),
AbstractUnsafe.class, "write(...)");
private static final
NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
NotYetConnectedException(),
AbstractUnsafe.class, "flush0()");
private final
Channel parent;
private final
ChannelId id;
private final
Unsafe unsafe;
private final
DefaultChannelPipeline pipeline;
private final
VoidChannelPromise unsafeVoidPromise = new
VoidChannelPromise(this, false);
private final
CloseFuture closeFuture = new
CloseFuture(this);
private volatile
SocketAddress localAddress;
private volatile
SocketAddress remoteAddress;
private volatile
EventLoop eventLoop;
private volatile boolean
registered;
private boolean
closeInitiated;
/** Cache for the string representation of this channel */
private boolean
strValActive;
private
String strVal;
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected
AbstractChannel(
Channel parent) {
this.
parent =
parent;
id =
newId();
unsafe =
newUnsafe();
pipeline =
newChannelPipeline();
}
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected
AbstractChannel(
Channel parent,
ChannelId id) {
this.
parent =
parent;
this.
id =
id;
unsafe =
newUnsafe();
pipeline =
newChannelPipeline();
}
@
Override
public final
ChannelId id() {
return
id;
}
/**
* Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
* {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
*/
protected
ChannelId newId() {
return
DefaultChannelId.
newInstance();
}
/**
* Returns a new {@link DefaultChannelPipeline} instance.
*/
protected
DefaultChannelPipeline newChannelPipeline() {
return new
DefaultChannelPipeline(this);
}
@
Override
public boolean
isWritable() {
ChannelOutboundBuffer buf =
unsafe.
outboundBuffer();
return
buf != null &&
buf.
isWritable();
}
@
Override
public long
bytesBeforeUnwritable() {
ChannelOutboundBuffer buf =
unsafe.
outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return
buf != null ?
buf.
bytesBeforeUnwritable() : 0;
}
@
Override
public long
bytesBeforeWritable() {
ChannelOutboundBuffer buf =
unsafe.
outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return
buf != null ?
buf.
bytesBeforeWritable() :
Long.
MAX_VALUE;
}
@
Override
public
Channel parent() {
return
parent;
}
@
Override
public
ChannelPipeline pipeline() {
return
pipeline;
}
@
Override
public
ByteBufAllocator alloc() {
return
config().
getAllocator();
}
@
Override
public
EventLoop eventLoop() {
EventLoop eventLoop = this.
eventLoop;
if (
eventLoop == null) {
throw new
IllegalStateException("channel not registered to an event loop");
}
return
eventLoop;
}
@
Override
public
SocketAddress localAddress() {
SocketAddress localAddress = this.
localAddress;
if (
localAddress == null) {
try {
this.
localAddress =
localAddress =
unsafe().
localAddress();
} catch (
Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return
localAddress;
}
/**
* @deprecated no use-case for this.
*/
@
Deprecated
protected void
invalidateLocalAddress() {
localAddress = null;
}
@
Override
public
SocketAddress remoteAddress() {
SocketAddress remoteAddress = this.
remoteAddress;
if (
remoteAddress == null) {
try {
this.
remoteAddress =
remoteAddress =
unsafe().
remoteAddress();
} catch (
Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return
remoteAddress;
}
/**
* @deprecated no use-case for this.
*/
@
Deprecated
protected void
invalidateRemoteAddress() {
remoteAddress = null;
}
@
Override
public boolean
isRegistered() {
return
registered;
}
@
Override
public
ChannelFuture bind(
SocketAddress localAddress) {
return
pipeline.
bind(
localAddress);
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress) {
return
pipeline.
connect(
remoteAddress);
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress,
SocketAddress localAddress) {
return
pipeline.
connect(
remoteAddress,
localAddress);
}
@
Override
public
ChannelFuture disconnect() {
return
pipeline.
disconnect();
}
@
Override
public
ChannelFuture close() {
return
pipeline.
close();
}
@
Override
public
ChannelFuture deregister() {
return
pipeline.
deregister();
}
@
Override
public
Channel flush() {
pipeline.
flush();
return this;
}
@
Override
public
ChannelFuture bind(
SocketAddress localAddress,
ChannelPromise promise) {
return
pipeline.
bind(
localAddress,
promise);
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress,
ChannelPromise promise) {
return
pipeline.
connect(
remoteAddress,
promise);
}
@
Override
public
ChannelFuture connect(
SocketAddress remoteAddress,
SocketAddress localAddress,
ChannelPromise promise) {
return
pipeline.
connect(
remoteAddress,
localAddress,
promise);
}
@
Override
public
ChannelFuture disconnect(
ChannelPromise promise) {
return
pipeline.
disconnect(
promise);
}
@
Override
public
ChannelFuture close(
ChannelPromise promise) {
return
pipeline.
close(
promise);
}
@
Override
public
ChannelFuture deregister(
ChannelPromise promise) {
return
pipeline.
deregister(
promise);
}
@
Override
public
Channel read() {
pipeline.
read();
return this;
}
@
Override
public
ChannelFuture write(
Object msg) {
return
pipeline.
write(
msg);
}
@
Override
public
ChannelFuture write(
Object msg,
ChannelPromise promise) {
return
pipeline.
write(
msg,
promise);
}
@
Override
public
ChannelFuture writeAndFlush(
Object msg) {
return
pipeline.
writeAndFlush(
msg);
}
@
Override
public
ChannelFuture writeAndFlush(
Object msg,
ChannelPromise promise) {
return
pipeline.
writeAndFlush(
msg,
promise);
}
@
Override
public
ChannelPromise newPromise() {
return
pipeline.
newPromise();
}
@
Override
public
ChannelProgressivePromise newProgressivePromise() {
return
pipeline.
newProgressivePromise();
}
@
Override
public
ChannelFuture newSucceededFuture() {
return
pipeline.
newSucceededFuture();
}
@
Override
public
ChannelFuture newFailedFuture(
Throwable cause) {
return
pipeline.
newFailedFuture(
cause);
}
@
Override
public
ChannelFuture closeFuture() {
return
closeFuture;
}
@
Override
public
Unsafe unsafe() {
return
unsafe;
}
/**
* Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
*/
protected abstract
AbstractUnsafe newUnsafe();
/**
* Returns the ID of this channel.
*/
@
Override
public final int
hashCode() {
return
id.
hashCode();
}
/**
* Returns {@code true} if and only if the specified object is identical
* with this channel (i.e: {@code this == o}).
*/
@
Override
public final boolean
equals(
Object o) {
return this ==
o;
}
@
Override
public final int
compareTo(
Channel o) {
if (this ==
o) {
return 0;
}
return
id().
compareTo(
o.
id());
}
/**
* Returns the {@link String} representation of this channel. The returned
* string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
* and {@linkplain #remoteAddress() remote address} of this channel for
* easier identification.
*/
@
Override
public
String toString() {
boolean
active =
isActive();
if (
strValActive ==
active &&
strVal != null) {
return
strVal;
}
SocketAddress remoteAddr =
remoteAddress();
SocketAddress localAddr =
localAddress();
if (
remoteAddr != null) {
StringBuilder buf = new
StringBuilder(96)
.
append("[id: 0x")
.
append(
id.
asShortText())
.
append(", L:")
.
append(
localAddr)
.
append(
active? " - " : " ! ")
.
append("R:")
.
append(
remoteAddr)
.
append(']');
strVal =
buf.
toString();
} else if (
localAddr != null) {
StringBuilder buf = new
StringBuilder(64)
.
append("[id: 0x")
.
append(
id.
asShortText())
.
append(", L:")
.
append(
localAddr)
.
append(']');
strVal =
buf.
toString();
} else {
StringBuilder buf = new
StringBuilder(16)
.
append("[id: 0x")
.
append(
id.
asShortText())
.
append(']');
strVal =
buf.
toString();
}
strValActive =
active;
return
strVal;
}
@
Override
public final
ChannelPromise voidPromise() {
return
pipeline.
voidPromise();
}
/**
* {@link Unsafe} implementation which sub-classes must extend and use.
*/
protected abstract class
AbstractUnsafe implements
Unsafe {
private volatile
ChannelOutboundBuffer outboundBuffer = new
ChannelOutboundBuffer(
AbstractChannel.this);
private
RecvByteBufAllocator.
Handle recvHandle;
private boolean
inFlush0;
/** true if the channel has never been registered, false otherwise */
private boolean
neverRegistered = true;
private void
assertEventLoop() {
assert !
registered ||
eventLoop.
inEventLoop();
}
@
Override
public
RecvByteBufAllocator.
Handle recvBufAllocHandle() {
if (
recvHandle == null) {
recvHandle =
config().
getRecvByteBufAllocator().
newHandle();
}
return
recvHandle;
}
@
Override
public final
ChannelOutboundBuffer outboundBuffer() {
return
outboundBuffer;
}
@
Override
public final
SocketAddress localAddress() {
return
localAddress0();
}
@
Override
public final
SocketAddress remoteAddress() {
return
remoteAddress0();
}
@
Override
public final void
register(
EventLoop eventLoop, final
ChannelPromise promise) {
if (
eventLoop == null) {
throw new
NullPointerException("eventLoop");
}
if (
isRegistered()) {
promise.
setFailure(new
IllegalStateException("registered to an event loop already"));
return;
}
if (!
isCompatible(
eventLoop)) {
promise.
setFailure(
new
IllegalStateException("incompatible event loop type: " +
eventLoop.
getClass().
getName()));
return;
}
AbstractChannel.this.
eventLoop =
eventLoop;
if (
eventLoop.
inEventLoop()) {
register0(
promise);
} else {
try {
eventLoop.
execute(new
Runnable() {
@
Override
public void
run() {
register0(
promise);
}
});
} catch (
Throwable t) {
logger.
warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this,
t);
closeForcibly();
closeFuture.
setClosed();
safeSetFailure(
promise,
t);
}
}
}
private void
register0(
ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!
promise.
setUncancellable() || !
ensureOpen(
promise)) {
return;
}
boolean
firstRegistration =
neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.
invokeHandlerAddedIfNeeded();
safeSetSuccess(
promise);
pipeline.
fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (
isActive()) {
if (
firstRegistration) {
pipeline.
fireChannelActive();
} else if (
config().
isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (
Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.
setClosed();
safeSetFailure(
promise,
t);
}
}
@
Override
public final void
bind(final
SocketAddress localAddress, final
ChannelPromise promise) {
assertEventLoop();
if (!
promise.
setUncancellable() || !
ensureOpen(
promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (
Boolean.
TRUE.
equals(
config().
getOption(
ChannelOption.
SO_BROADCAST)) &&
localAddress instanceof
InetSocketAddress &&
!((
InetSocketAddress)
localAddress).
getAddress().
isAnyLocalAddress() &&
!
PlatformDependent.
isWindows() && !
PlatformDependent.
maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.
warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" +
localAddress + ") anyway as requested.");
}
boolean
wasActive =
isActive();
try {
doBind(
localAddress);
} catch (
Throwable t) {
safeSetFailure(
promise,
t);
closeIfClosed();
return;
}
if (!
wasActive &&
isActive()) {
invokeLater(new
Runnable() {
@
Override
public void
run() {
pipeline.
fireChannelActive();
}
});
}
safeSetSuccess(
promise);
}
@
Override
public final void
disconnect(final
ChannelPromise promise) {
assertEventLoop();
if (!
promise.
setUncancellable()) {
return;
}
boolean
wasActive =
isActive();
try {
doDisconnect();
} catch (
Throwable t) {
safeSetFailure(
promise,
t);
closeIfClosed();
return;
}
if (
wasActive && !
isActive()) {
invokeLater(new
Runnable() {
@
Override
public void
run() {
pipeline.
fireChannelInactive();
}
});
}
safeSetSuccess(
promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
@
Override
public final void
close(final
ChannelPromise promise) {
assertEventLoop();
close(
promise,
CLOSE_CLOSED_CHANNEL_EXCEPTION,
CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
/**
* Shutdown the output portion of the corresponding {@link Channel}.
* For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
*/
@
UnstableApi
public final void
shutdownOutput(final
ChannelPromise promise) {
assertEventLoop();
shutdownOutput(
promise, null);
}
/**
* Shutdown the output portion of the corresponding {@link Channel}.
* For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
* @param cause The cause which may provide rational for the shutdown.
*/
private void
shutdownOutput(final
ChannelPromise promise,
Throwable cause) {
if (!
promise.
setUncancellable()) {
return;
}
final
ChannelOutboundBuffer outboundBuffer = this.
outboundBuffer;
if (
outboundBuffer == null) {
promise.
setFailure(
CLOSE_CLOSED_CHANNEL_EXCEPTION);
return;
}
this.
outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
final
Throwable shutdownCause =
cause == null ?
new
ChannelOutputShutdownException("Channel output shutdown") :
new
ChannelOutputShutdownException("Channel output shutdown",
cause);
Executor closeExecutor =
prepareToClose();
if (
closeExecutor != null) {
closeExecutor.
execute(new
Runnable() {
@
Override
public void
run() {
try {
// Execute the shutdown.
doShutdownOutput();
promise.
setSuccess();
} catch (
Throwable err) {
promise.
setFailure(
err);
} finally {
// Dispatch to the EventLoop
eventLoop().
execute(new
Runnable() {
@
Override
public void
run() {
closeOutboundBufferForShutdown(
pipeline,
outboundBuffer,
shutdownCause);
}
});
}
}
});
} else {
try {
// Execute the shutdown.
doShutdownOutput();
promise.
setSuccess();
} catch (
Throwable err) {
promise.
setFailure(
err);
} finally {
closeOutboundBufferForShutdown(
pipeline,
outboundBuffer,
shutdownCause);
}
}
}
private void
closeOutboundBufferForShutdown(
ChannelPipeline pipeline,
ChannelOutboundBuffer buffer,
Throwable cause) {
buffer.
failFlushed(
cause, false);
buffer.
close(
cause, true);
pipeline.
fireUserEventTriggered(
ChannelOutputShutdownEvent.
INSTANCE);
}
private void
close(final
ChannelPromise promise, final
Throwable cause,
final
ClosedChannelException closeCause, final boolean
notify) {
if (!
promise.
setUncancellable()) {
return;
}
if (
closeInitiated) {
if (
closeFuture.
isDone()) {
// Closed already.
safeSetSuccess(
promise);
} else if (!(
promise instanceof
VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.
addListener(new
ChannelFutureListener() {
@
Override
public void
operationComplete(
ChannelFuture future) throws
Exception {
promise.
setSuccess();
}
});
}
return;
}
closeInitiated = true;
final boolean
wasActive =
isActive();
final
ChannelOutboundBuffer outboundBuffer = this.
outboundBuffer;
this.
outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor =
prepareToClose();
if (
closeExecutor != null) {
closeExecutor.
execute(new
Runnable() {
@
Override
public void
run() {
try {
// Execute the close.
doClose0(
promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new
Runnable() {
@
Override
public void
run() {
if (
outboundBuffer != null) {
// Fail all the queued messages
outboundBuffer.
failFlushed(
cause,
notify);
outboundBuffer.
close(
closeCause);
}
fireChannelInactiveAndDeregister(
wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(
promise);
} finally {
if (
outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.
failFlushed(
cause,
notify);
outboundBuffer.
close(
closeCause);
}
}
if (
inFlush0) {
invokeLater(new
Runnable() {
@
Override
public void
run() {
fireChannelInactiveAndDeregister(
wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(
wasActive);
}
}
}
private void
doClose0(
ChannelPromise promise) {
try {
doClose();
closeFuture.
setClosed();
safeSetSuccess(
promise);
} catch (
Throwable t) {
closeFuture.
setClosed();
safeSetFailure(
promise,
t);
}
}
private void
fireChannelInactiveAndDeregister(final boolean
wasActive) {
deregister(
voidPromise(),
wasActive && !
isActive());
}
@
Override
public final void
closeForcibly() {
assertEventLoop();
try {
doClose();
} catch (
Exception e) {
logger.
warn("Failed to close a channel.",
e);
}
}
@
Override
public final void
deregister(final
ChannelPromise promise) {
assertEventLoop();
deregister(
promise, false);
}
private void
deregister(final
ChannelPromise promise, final boolean
fireChannelInactive) {
if (!
promise.
setUncancellable()) {
return;
}
if (!
registered) {
safeSetSuccess(
promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new
Runnable() {
@
Override
public void
run() {
try {
doDeregister();
} catch (
Throwable t) {
logger.
warn("Unexpected exception occurred while deregistering a channel.",
t);
} finally {
if (
fireChannelInactive) {
pipeline.
fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (
registered) {
registered = false;
pipeline.
fireChannelUnregistered();
}
safeSetSuccess(
promise);
}
}
});
}
@
Override
public final void
beginRead() {
assertEventLoop();
if (!
isActive()) {
return;
}
try {
doBeginRead();
} catch (final
Exception e) {
invokeLater(new
Runnable() {
@
Override
public void
run() {
pipeline.
fireExceptionCaught(
e);
}
});
close(
voidPromise());
}
}
@
Override
public final void
write(
Object msg,
ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.
outboundBuffer;
if (
outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(
promise,
WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.
release(
msg);
return;
}
int
size;
try {
msg =
filterOutboundMessage(
msg);
size =
pipeline.
estimatorHandle().
size(
msg);
if (
size < 0) {
size = 0;
}
} catch (
Throwable t) {
safeSetFailure(
promise,
t);
ReferenceCountUtil.
release(
msg);
return;
}
outboundBuffer.
addMessage(
msg,
size,
promise);
}
@
Override
public final void
flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.
outboundBuffer;
if (
outboundBuffer == null) {
return;
}
outboundBuffer.
addFlush();
flush0();
}
@
SuppressWarnings("deprecation")
protected void
flush0() {
if (
inFlush0) {
// Avoid re-entrance
return;
}
final
ChannelOutboundBuffer outboundBuffer = this.
outboundBuffer;
if (
outboundBuffer == null ||
outboundBuffer.
isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!
isActive()) {
try {
if (
isOpen()) {
outboundBuffer.
failFlushed(
FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.
failFlushed(
FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(
outboundBuffer);
} catch (
Throwable t) {
if (
t instanceof
IOException &&
config().
isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(
voidPromise(),
t,
FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(
voidPromise(),
t);
} catch (
Throwable t2) {
close(
voidPromise(),
t2,
FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
@
Override
public final
ChannelPromise voidPromise() {
assertEventLoop();
return
unsafeVoidPromise;
}
protected final boolean
ensureOpen(
ChannelPromise promise) {
if (
isOpen()) {
return true;
}
safeSetFailure(
promise,
ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
return false;
}
/**
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
*/
protected final void
safeSetSuccess(
ChannelPromise promise) {
if (!(
promise instanceof
VoidChannelPromise) && !
promise.
trySuccess()) {
logger.
warn("Failed to mark a promise as success because it is done already: {}",
promise);
}
}
/**
* Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message.
*/
protected final void
safeSetFailure(
ChannelPromise promise,
Throwable cause) {
if (!(
promise instanceof
VoidChannelPromise) && !
promise.
tryFailure(
cause)) {
logger.
warn("Failed to mark a promise as failure because it's done already: {}",
promise,
cause);
}
}
protected final void
closeIfClosed() {
if (
isOpen()) {
return;
}
close(
voidPromise());
}
private void
invokeLater(
Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().
execute(
task);
} catch (
RejectedExecutionException e) {
logger.
warn("Can't invoke task later as EventLoop rejected it",
e);
}
}
/**
* Appends the remote address to the message of the exceptions caused by connection attempt failure.
*/
protected final
Throwable annotateConnectException(
Throwable cause,
SocketAddress remoteAddress) {
if (
cause instanceof
ConnectException) {
return new
AnnotatedConnectException((
ConnectException)
cause,
remoteAddress);
}
if (
cause instanceof
NoRouteToHostException) {
return new
AnnotatedNoRouteToHostException((
NoRouteToHostException)
cause,
remoteAddress);
}
if (
cause instanceof
SocketException) {
return new
AnnotatedSocketException((
SocketException)
cause,
remoteAddress);
}
return
cause;
}
/**
* Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
* caller must call the {@link Executor#execute(Runnable)} method with a task that calls
* {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
* {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
*/
protected
Executor prepareToClose() {
return null;
}
}
/**
* Return {@code true} if the given {@link EventLoop} is compatible with this instance.
*/
protected abstract boolean
isCompatible(
EventLoop loop);
/**
* Returns the {@link SocketAddress} which is bound locally.
*/
protected abstract
SocketAddress localAddress0();
/**
* Return the {@link SocketAddress} which the {@link Channel} is connected to.
*/
protected abstract
SocketAddress remoteAddress0();
/**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
*
* Sub-classes may override this method
*/
protected void
doRegister() throws
Exception {
// NOOP
}
/**
* Bind the {@link Channel} to the {@link SocketAddress}
*/
protected abstract void
doBind(
SocketAddress localAddress) throws
Exception;
/**
* Disconnect this {@link Channel} from its remote peer
*/
protected abstract void
doDisconnect() throws
Exception;
/**
* Close the {@link Channel}
*/
protected abstract void
doClose() throws
Exception;
/**
* Called when conditions justify shutting down the output portion of the channel. This may happen if a write
* operation throws an exception.
*/
@
UnstableApi
protected void
doShutdownOutput() throws
Exception {
doClose();
}
/**
* Deregister the {@link Channel} from its {@link EventLoop}.
*
* Sub-classes may override this method
*/
protected void
doDeregister() throws
Exception {
// NOOP
}
/**
* Schedule a read operation.
*/
protected abstract void
doBeginRead() throws
Exception;
/**
* Flush the content of the given buffer to the remote peer.
*/
protected abstract void
doWrite(
ChannelOutboundBuffer in) throws
Exception;
/**
* Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
* the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
*/
protected
Object filterOutboundMessage(
Object msg) throws
Exception {
return
msg;
}
static final class
CloseFuture extends
DefaultChannelPromise {
CloseFuture(
AbstractChannel ch) {
super(
ch);
}
@
Override
public
ChannelPromise setSuccess() {
throw new
IllegalStateException();
}
@
Override
public
ChannelPromise setFailure(
Throwable cause) {
throw new
IllegalStateException();
}
@
Override
public boolean
trySuccess() {
throw new
IllegalStateException();
}
@
Override
public boolean
tryFailure(
Throwable cause) {
throw new
IllegalStateException();
}
boolean
setClosed() {
return super.trySuccess();
}
}
private static final class
AnnotatedConnectException extends
ConnectException {
private static final long
serialVersionUID = 3901958112696433556L;
AnnotatedConnectException(
ConnectException exception,
SocketAddress remoteAddress) {
super(
exception.
getMessage() + ": " +
remoteAddress);
initCause(
exception);
setStackTrace(
exception.
getStackTrace());
}
@
Override
public
Throwable fillInStackTrace() {
return this;
}
}
private static final class
AnnotatedNoRouteToHostException extends
NoRouteToHostException {
private static final long
serialVersionUID = -6801433937592080623L;
AnnotatedNoRouteToHostException(
NoRouteToHostException exception,
SocketAddress remoteAddress) {
super(
exception.
getMessage() + ": " +
remoteAddress);
initCause(
exception);
setStackTrace(
exception.
getStackTrace());
}
@
Override
public
Throwable fillInStackTrace() {
return this;
}
}
private static final class
AnnotatedSocketException extends
SocketException {
private static final long
serialVersionUID = 3896743275010454039L;
AnnotatedSocketException(
SocketException exception,
SocketAddress remoteAddress) {
super(
exception.
getMessage() + ": " +
remoteAddress);
initCause(
exception);
setStackTrace(
exception.
getStackTrace());
}
@
Override
public
Throwable fillInStackTrace() {
return this;
}
}
}