/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.
AbstractChannel;
import io.netty.channel.
Channel;
import io.netty.channel.
ChannelConfig;
import io.netty.channel.
ChannelMetadata;
import io.netty.channel.
ChannelOutboundBuffer;
import io.netty.channel.
ChannelPipeline;
import io.netty.channel.
ChannelPromise;
import io.netty.channel.
DefaultChannelConfig;
import io.netty.channel.
EventLoop;
import io.netty.channel.
PreferHeapByteBufAllocator;
import io.netty.channel.
RecvByteBufAllocator;
import io.netty.channel.
SingleThreadEventLoop;
import io.netty.util.
ReferenceCountUtil;
import io.netty.util.concurrent.
Future;
import io.netty.util.concurrent.
SingleThreadEventExecutor;
import io.netty.util.internal.
InternalThreadLocalMap;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
ThrowableUtil;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.net.
ConnectException;
import java.net.
SocketAddress;
import java.nio.channels.
AlreadyConnectedException;
import java.nio.channels.
ClosedChannelException;
import java.nio.channels.
ConnectionPendingException;
import java.nio.channels.
NotYetConnectedException;
import java.util.
Queue;
import java.util.concurrent.atomic.
AtomicReferenceFieldUpdater;
/**
* A {@link Channel} for the local transport.
*/
public class
LocalChannel extends
AbstractChannel {
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
LocalChannel.class);
@
SuppressWarnings({ "rawtypes" })
private static final
AtomicReferenceFieldUpdater<
LocalChannel,
Future>
FINISH_READ_FUTURE_UPDATER =
AtomicReferenceFieldUpdater.
newUpdater(
LocalChannel.class,
Future.class, "finishReadFuture");
private static final
ChannelMetadata METADATA = new
ChannelMetadata(false);
private static final int
MAX_READER_STACK_DEPTH = 8;
private static final
ClosedChannelException DO_WRITE_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
ClosedChannelException(),
LocalChannel.class, "doWrite(...)");
private static final
ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.
unknownStackTrace(
new
ClosedChannelException(),
LocalChannel.class, "doClose()");
private enum
State { OPEN, BOUND, CONNECTED, CLOSED }
private final
ChannelConfig config = new
DefaultChannelConfig(this);
// To further optimize this we could write our own SPSC queue.
final
Queue<
Object>
inboundBuffer =
PlatformDependent.
newSpscQueue();
private final
Runnable readTask = new
Runnable() {
@
Override
public void
run() {
// Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
if (!
inboundBuffer.
isEmpty()) {
readInbound();
}
}
};
private final
Runnable shutdownHook = new
Runnable() {
@
Override
public void
run() {
unsafe().
close(
unsafe().
voidPromise());
}
};
private volatile
State state;
private volatile
LocalChannel peer;
private volatile
LocalAddress localAddress;
private volatile
LocalAddress remoteAddress;
private volatile
ChannelPromise connectPromise;
private volatile boolean
readInProgress;
private volatile boolean
writeInProgress;
private volatile
Future<?>
finishReadFuture;
public
LocalChannel() {
super(null);
config().
setAllocator(new
PreferHeapByteBufAllocator(
config.
getAllocator()));
}
protected
LocalChannel(
LocalServerChannel parent,
LocalChannel peer) {
super(
parent);
config().
setAllocator(new
PreferHeapByteBufAllocator(
config.
getAllocator()));
this.
peer =
peer;
localAddress =
parent.
localAddress();
remoteAddress =
peer.
localAddress();
}
@
Override
public
ChannelMetadata metadata() {
return
METADATA;
}
@
Override
public
ChannelConfig config() {
return
config;
}
@
Override
public
LocalServerChannel parent() {
return (
LocalServerChannel) super.parent();
}
@
Override
public
LocalAddress localAddress() {
return (
LocalAddress) super.localAddress();
}
@
Override
public
LocalAddress remoteAddress() {
return (
LocalAddress) super.remoteAddress();
}
@
Override
public boolean
isOpen() {
return
state !=
State.
CLOSED;
}
@
Override
public boolean
isActive() {
return
state ==
State.
CONNECTED;
}
@
Override
protected
AbstractUnsafe newUnsafe() {
return new
LocalUnsafe();
}
@
Override
protected boolean
isCompatible(
EventLoop loop) {
return
loop instanceof
SingleThreadEventLoop;
}
@
Override
protected
SocketAddress localAddress0() {
return
localAddress;
}
@
Override
protected
SocketAddress remoteAddress0() {
return
remoteAddress;
}
@
Override
protected void
doRegister() throws
Exception {
// Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
// This is needed as a peer may not be null also if a LocalChannel was connected before and
// deregistered / registered later again.
//
// See https://github.com/netty/netty/issues/2400
if (
peer != null &&
parent() != null) {
// Store the peer in a local variable as it may be set to null if doClose() is called.
// See https://github.com/netty/netty/issues/2144
final
LocalChannel peer = this.
peer;
state =
State.
CONNECTED;
peer.
remoteAddress =
parent() == null ? null :
parent().
localAddress();
peer.
state =
State.
CONNECTED;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelActive
// event is triggered *after* this channel's channelRegistered event, so that this channel's
// pipeline is fully initialized by ChannelInitializer before any channelRead events.
peer.
eventLoop().
execute(new
Runnable() {
@
Override
public void
run() {
ChannelPromise promise =
peer.
connectPromise;
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
// connectPromise may be set to null if doClose() was called in the meantime.
if (
promise != null &&
promise.
trySuccess()) {
peer.
pipeline().
fireChannelActive();
}
}
});
}
((
SingleThreadEventExecutor)
eventLoop()).
addShutdownHook(
shutdownHook);
}
@
Override
protected void
doBind(
SocketAddress localAddress) throws
Exception {
this.
localAddress =
LocalChannelRegistry.
register(this, this.
localAddress,
localAddress);
state =
State.
BOUND;
}
@
Override
protected void
doDisconnect() throws
Exception {
doClose();
}
@
Override
protected void
doClose() throws
Exception {
final
LocalChannel peer = this.
peer;
State oldState =
state;
try {
if (
oldState !=
State.
CLOSED) {
// Update all internal state before the closeFuture is notified.
if (
localAddress != null) {
if (
parent() == null) {
LocalChannelRegistry.
unregister(
localAddress);
}
localAddress = null;
}
// State change must happen before finishPeerRead to ensure writes are released either in doWrite or
// channelRead.
state =
State.
CLOSED;
// Preserve order of event and force a read operation now before the close operation is processed.
if (
writeInProgress &&
peer != null) {
finishPeerRead(
peer);
}
ChannelPromise promise =
connectPromise;
if (
promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.
tryFailure(
DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
}
if (
peer != null) {
this.
peer = null;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelInActive
// event is triggered *after* this peer's channelInActive event
EventLoop peerEventLoop =
peer.
eventLoop();
final boolean
peerIsActive =
peer.
isActive();
try {
peerEventLoop.
execute(new
Runnable() {
@
Override
public void
run() {
peer.
tryClose(
peerIsActive);
}
});
} catch (
Throwable cause) {
logger.
warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this,
peer,
cause);
if (
peerEventLoop.
inEventLoop()) {
peer.
releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.
close();
}
PlatformDependent.
throwException(
cause);
}
}
} finally {
// Release all buffers if the Channel was already registered in the past and if it was not closed before.
if (
oldState != null &&
oldState !=
State.
CLOSED) {
// We need to release all the buffers that may be put into our inbound queue since we closed the Channel
// to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
// means even if the promise was notified before its not really guaranteed that the "remote peer" will
// see the buffer at all.
releaseInboundBuffers();
}
}
}
private void
tryClose(boolean
isActive) {
if (
isActive) {
unsafe().
close(
unsafe().
voidPromise());
} else {
releaseInboundBuffers();
}
}
@
Override
protected void
doDeregister() throws
Exception {
// Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
((
SingleThreadEventExecutor)
eventLoop()).
removeShutdownHook(
shutdownHook);
}
private void
readInbound() {
RecvByteBufAllocator.
Handle handle =
unsafe().
recvBufAllocHandle();
handle.
reset(
config());
ChannelPipeline pipeline =
pipeline();
do {
Object received =
inboundBuffer.
poll();
if (
received == null) {
break;
}
pipeline.
fireChannelRead(
received);
} while (
handle.
continueReading());
pipeline.
fireChannelReadComplete();
}
@
Override
protected void
doBeginRead() throws
Exception {
if (
readInProgress) {
return;
}
Queue<
Object>
inboundBuffer = this.
inboundBuffer;
if (
inboundBuffer.
isEmpty()) {
readInProgress = true;
return;
}
final
InternalThreadLocalMap threadLocals =
InternalThreadLocalMap.
get();
final
Integer stackDepth =
threadLocals.
localChannelReaderStackDepth();
if (
stackDepth <
MAX_READER_STACK_DEPTH) {
threadLocals.
setLocalChannelReaderStackDepth(
stackDepth + 1);
try {
readInbound();
} finally {
threadLocals.
setLocalChannelReaderStackDepth(
stackDepth);
}
} else {
try {
eventLoop().
execute(
readTask);
} catch (
Throwable cause) {
logger.
warn("Closing Local channels {}-{} because exception occurred!", this,
peer,
cause);
close();
peer.
close();
PlatformDependent.
throwException(
cause);
}
}
}
@
Override
protected void
doWrite(
ChannelOutboundBuffer in) throws
Exception {
switch (
state) {
case
OPEN:
case
BOUND:
throw new
NotYetConnectedException();
case
CLOSED:
throw
DO_WRITE_CLOSED_CHANNEL_EXCEPTION;
case
CONNECTED:
break;
}
final
LocalChannel peer = this.
peer;
writeInProgress = true;
try {
for (;;) {
Object msg =
in.
current();
if (
msg == null) {
break;
}
try {
// It is possible the peer could have closed while we are writing, and in this case we should
// simulate real socket behavior and ensure the write operation is failed.
if (
peer.
state ==
State.
CONNECTED) {
peer.
inboundBuffer.
add(
ReferenceCountUtil.
retain(
msg));
in.
remove();
} else {
in.
remove(
DO_WRITE_CLOSED_CHANNEL_EXCEPTION);
}
} catch (
Throwable cause) {
in.
remove(
cause);
}
}
} finally {
// The following situation may cause trouble:
// 1. Write (with promise X)
// 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
// 3. Then the close event will be executed for the peer before the write events, when the write events
// actually happened before the close event.
writeInProgress = false;
}
finishPeerRead(
peer);
}
private void
finishPeerRead(final
LocalChannel peer) {
// If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
if (
peer.
eventLoop() ==
eventLoop() && !
peer.
writeInProgress) {
finishPeerRead0(
peer);
} else {
runFinishPeerReadTask(
peer);
}
}
private void
runFinishPeerReadTask(final
LocalChannel peer) {
// If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
// we keep track of the task, and coordinate later that our read can't happen until the peer is done.
final
Runnable finishPeerReadTask = new
Runnable() {
@
Override
public void
run() {
finishPeerRead0(
peer);
}
};
try {
if (
peer.
writeInProgress) {
peer.
finishReadFuture =
peer.
eventLoop().
submit(
finishPeerReadTask);
} else {
peer.
eventLoop().
execute(
finishPeerReadTask);
}
} catch (
Throwable cause) {
logger.
warn("Closing Local channels {}-{} because exception occurred!", this,
peer,
cause);
close();
peer.
close();
PlatformDependent.
throwException(
cause);
}
}
private void
releaseInboundBuffers() {
assert
eventLoop() == null ||
eventLoop().
inEventLoop();
readInProgress = false;
Queue<
Object>
inboundBuffer = this.
inboundBuffer;
Object msg;
while ((
msg =
inboundBuffer.
poll()) != null) {
ReferenceCountUtil.
release(
msg);
}
}
private void
finishPeerRead0(
LocalChannel peer) {
Future<?>
peerFinishReadFuture =
peer.
finishReadFuture;
if (
peerFinishReadFuture != null) {
if (!
peerFinishReadFuture.
isDone()) {
runFinishPeerReadTask(
peer);
return;
} else {
// Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
FINISH_READ_FUTURE_UPDATER.
compareAndSet(
peer,
peerFinishReadFuture, null);
}
}
// We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
// forward data later on.
if (
peer.
readInProgress && !
peer.
inboundBuffer.
isEmpty()) {
peer.
readInProgress = false;
peer.
readInbound();
}
}
private class
LocalUnsafe extends
AbstractUnsafe {
@
Override
public void
connect(final
SocketAddress remoteAddress,
SocketAddress localAddress, final
ChannelPromise promise) {
if (!
promise.
setUncancellable() || !
ensureOpen(
promise)) {
return;
}
if (
state ==
State.
CONNECTED) {
Exception cause = new
AlreadyConnectedException();
safeSetFailure(
promise,
cause);
pipeline().
fireExceptionCaught(
cause);
return;
}
if (
connectPromise != null) {
throw new
ConnectionPendingException();
}
connectPromise =
promise;
if (
state !=
State.
BOUND) {
// Not bound yet and no localAddress specified - get one.
if (
localAddress == null) {
localAddress = new
LocalAddress(
LocalChannel.this);
}
}
if (
localAddress != null) {
try {
doBind(
localAddress);
} catch (
Throwable t) {
safeSetFailure(
promise,
t);
close(
voidPromise());
return;
}
}
Channel boundChannel =
LocalChannelRegistry.
get(
remoteAddress);
if (!(
boundChannel instanceof
LocalServerChannel)) {
Exception cause = new
ConnectException("connection refused: " +
remoteAddress);
safeSetFailure(
promise,
cause);
close(
voidPromise());
return;
}
LocalServerChannel serverChannel = (
LocalServerChannel)
boundChannel;
peer =
serverChannel.
serve(
LocalChannel.this);
}
}
}