/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.
ByteBuf;
import io.netty.buffer.
ByteBufAllocator;
import io.netty.buffer.
CompositeByteBuf;
import io.netty.buffer.
Unpooled;
import io.netty.channel.
ChannelHandlerContext;
import io.netty.channel.
ChannelInboundHandlerAdapter;
import io.netty.channel.socket.
ChannelInputShutdownEvent;
import io.netty.util.internal.
StringUtil;
import java.util.
List;
/**
* {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other Message type.
*
* For example here is an implementation which reads all readable bytes from
* the input {@link ByteBuf} and create a new {@link ByteBuf}.
*
* <pre>
* public class SquareDecoder extends {@link ByteToMessageDecoder} {
* {@code @Override}
* public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
* throws {@link Exception} {
* out.add(in.readBytes(in.readableBytes()));
* }
* }
* </pre>
*
* <h3>Frame detection</h3>
* <p>
* Generally frame detection should be handled earlier in the pipeline by adding a
* {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
* or {@link LineBasedFrameDecoder}.
* <p>
* If a custom frame decoder is required, then one needs to be careful when implementing
* one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
* complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
* for a complete frame, return without modifying the reader index to allow more bytes to arrive.
* <p>
* To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
* One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
* For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
* is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
* <h3>Pitfalls</h3>
* <p>
* Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
* annotated with {@link @Sharable}.
* <p>
* Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
* to avoid leaking memory.
*/
public abstract class
ByteToMessageDecoder extends
ChannelInboundHandlerAdapter {
/**
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
public static final
Cumulator MERGE_CUMULATOR = new
Cumulator() {
@
Override
public
ByteBuf cumulate(
ByteBufAllocator alloc,
ByteBuf cumulation,
ByteBuf in) {
final
ByteBuf buffer;
if (
cumulation.
writerIndex() >
cumulation.
maxCapacity() -
in.
readableBytes()
||
cumulation.
refCnt() > 1 ||
cumulation.
isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer =
expandCumulation(
alloc,
cumulation,
in.
readableBytes());
} else {
buffer =
cumulation;
}
buffer.
writeBytes(
in);
in.
release();
return
buffer;
}
};
/**
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
public static final
Cumulator COMPOSITE_CUMULATOR = new
Cumulator() {
@
Override
public
ByteBuf cumulate(
ByteBufAllocator alloc,
ByteBuf cumulation,
ByteBuf in) {
ByteBuf buffer;
if (
cumulation.
refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
// use slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer =
expandCumulation(
alloc,
cumulation,
in.
readableBytes());
buffer.
writeBytes(
in);
in.
release();
} else {
CompositeByteBuf composite;
if (
cumulation instanceof
CompositeByteBuf) {
composite = (
CompositeByteBuf)
cumulation;
} else {
composite =
alloc.
compositeBuffer(
Integer.
MAX_VALUE);
composite.
addComponent(true,
cumulation);
}
composite.
addComponent(true,
in);
buffer =
composite;
}
return
buffer;
}
};
private static final byte
STATE_INIT = 0;
private static final byte
STATE_CALLING_CHILD_DECODE = 1;
private static final byte
STATE_HANDLER_REMOVED_PENDING = 2;
ByteBuf cumulation;
private
Cumulator cumulator =
MERGE_CUMULATOR;
private boolean
singleDecode;
private boolean
decodeWasNull;
private boolean
first;
/**
* A bitmask where the bits are defined as
* <ul>
* <li>{@link #STATE_INIT}</li>
* <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
* <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
* </ul>
*/
private byte
decodeState =
STATE_INIT;
private int
discardAfterReads = 16;
private int
numReads;
protected
ByteToMessageDecoder() {
ensureNotSharable();
}
/**
* If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
*
* Default is {@code false} as this has performance impacts.
*/
public void
setSingleDecode(boolean
singleDecode) {
this.
singleDecode =
singleDecode;
}
/**
* If {@code true} then only one message is decoded on each
* {@link #channelRead(ChannelHandlerContext, Object)} call.
*
* Default is {@code false} as this has performance impacts.
*/
public boolean
isSingleDecode() {
return
singleDecode;
}
/**
* Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
*/
public void
setCumulator(
Cumulator cumulator) {
if (
cumulator == null) {
throw new
NullPointerException("cumulator");
}
this.
cumulator =
cumulator;
}
/**
* Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
* The default is {@code 16}.
*/
public void
setDiscardAfterReads(int
discardAfterReads) {
if (
discardAfterReads <= 0) {
throw new
IllegalArgumentException("discardAfterReads must be > 0");
}
this.
discardAfterReads =
discardAfterReads;
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you must use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int
actualReadableBytes() {
return
internalBuffer().
readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected
ByteBuf internalBuffer() {
if (
cumulation != null) {
return
cumulation;
} else {
return
Unpooled.
EMPTY_BUFFER;
}
}
@
Override
public final void
handlerRemoved(
ChannelHandlerContext ctx) throws
Exception {
if (
decodeState ==
STATE_CALLING_CHILD_DECODE) {
decodeState =
STATE_HANDLER_REMOVED_PENDING;
return;
}
ByteBuf buf =
cumulation;
if (
buf != null) {
// Directly set this to null so we are sure we not access it in any other method here anymore.
cumulation = null;
int
readable =
buf.
readableBytes();
if (
readable > 0) {
ByteBuf bytes =
buf.
readBytes(
readable);
buf.
release();
ctx.
fireChannelRead(
bytes);
} else {
buf.
release();
}
numReads = 0;
ctx.
fireChannelReadComplete();
}
handlerRemoved0(
ctx);
}
/**
* Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
* events anymore.
*/
protected void
handlerRemoved0(
ChannelHandlerContext ctx) throws
Exception { }
@
Override
public void
channelRead(
ChannelHandlerContext ctx,
Object msg) throws
Exception {
if (
msg instanceof
ByteBuf) {
CodecOutputList out =
CodecOutputList.
newInstance();
try {
ByteBuf data = (
ByteBuf)
msg;
first =
cumulation == null;
if (
first) {
cumulation =
data;
} else {
cumulation =
cumulator.
cumulate(
ctx.
alloc(),
cumulation,
data);
}
callDecode(
ctx,
cumulation,
out);
} catch (
DecoderException e) {
throw
e;
} catch (
Exception e) {
throw new
DecoderException(
e);
} finally {
if (
cumulation != null && !
cumulation.
isReadable()) {
numReads = 0;
cumulation.
release();
cumulation = null;
} else if (++
numReads >=
discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int
size =
out.
size();
decodeWasNull = !
out.
insertSinceRecycled();
fireChannelRead(
ctx,
out,
size);
out.
recycle();
}
} else {
ctx.
fireChannelRead(
msg);
}
}
/**
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
*/
static void
fireChannelRead(
ChannelHandlerContext ctx,
List<
Object>
msgs, int
numElements) {
if (
msgs instanceof
CodecOutputList) {
fireChannelRead(
ctx, (
CodecOutputList)
msgs,
numElements);
} else {
for (int
i = 0;
i <
numElements;
i++) {
ctx.
fireChannelRead(
msgs.
get(
i));
}
}
}
/**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void
fireChannelRead(
ChannelHandlerContext ctx,
CodecOutputList msgs, int
numElements) {
for (int
i = 0;
i <
numElements;
i ++) {
ctx.
fireChannelRead(
msgs.
getUnsafe(
i));
}
}
@
Override
public void
channelReadComplete(
ChannelHandlerContext ctx) throws
Exception {
numReads = 0;
discardSomeReadBytes();
if (
decodeWasNull) {
decodeWasNull = false;
if (!
ctx.
channel().
config().
isAutoRead()) {
ctx.
read();
}
}
ctx.
fireChannelReadComplete();
}
protected final void
discardSomeReadBytes() {
if (
cumulation != null && !
first &&
cumulation.
refCnt() == 1) {
// discard some bytes if possible to make more room in the
// buffer but only if the refCnt == 1 as otherwise the user may have
// used slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
cumulation.
discardSomeReadBytes();
}
}
@
Override
public void
channelInactive(
ChannelHandlerContext ctx) throws
Exception {
channelInputClosed(
ctx, true);
}
@
Override
public void
userEventTriggered(
ChannelHandlerContext ctx,
Object evt) throws
Exception {
if (
evt instanceof
ChannelInputShutdownEvent) {
// The decodeLast method is invoked when a channelInactive event is encountered.
// This method is responsible for ending requests in some situations and must be called
// when the input has been shutdown.
channelInputClosed(
ctx, false);
}
super.userEventTriggered(
ctx,
evt);
}
private void
channelInputClosed(
ChannelHandlerContext ctx, boolean
callChannelInactive) throws
Exception {
CodecOutputList out =
CodecOutputList.
newInstance();
try {
channelInputClosed(
ctx,
out);
} catch (
DecoderException e) {
throw
e;
} catch (
Exception e) {
throw new
DecoderException(
e);
} finally {
try {
if (
cumulation != null) {
cumulation.
release();
cumulation = null;
}
int
size =
out.
size();
fireChannelRead(
ctx,
out,
size);
if (
size > 0) {
// Something was read, call fireChannelReadComplete()
ctx.
fireChannelReadComplete();
}
if (
callChannelInactive) {
ctx.
fireChannelInactive();
}
} finally {
// Recycle in all cases
out.
recycle();
}
}
}
/**
* Called when the input of the channel was closed which may be because it changed to inactive or because of
* {@link ChannelInputShutdownEvent}.
*/
void
channelInputClosed(
ChannelHandlerContext ctx,
List<
Object>
out) throws
Exception {
if (
cumulation != null) {
callDecode(
ctx,
cumulation,
out);
decodeLast(
ctx,
cumulation,
out);
} else {
decodeLast(
ctx,
Unpooled.
EMPTY_BUFFER,
out);
}
}
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void
callDecode(
ChannelHandlerContext ctx,
ByteBuf in,
List<
Object>
out) {
try {
while (
in.
isReadable()) {
int
outSize =
out.
size();
if (
outSize > 0) {
fireChannelRead(
ctx,
out,
outSize);
out.
clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (
ctx.
isRemoved()) {
break;
}
outSize = 0;
}
int
oldInputLength =
in.
readableBytes();
decodeRemovalReentryProtection(
ctx,
in,
out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (
ctx.
isRemoved()) {
break;
}
if (
outSize ==
out.
size()) {
if (
oldInputLength ==
in.
readableBytes()) {
break;
} else {
continue;
}
}
if (
oldInputLength ==
in.
readableBytes()) {
throw new
DecoderException(
StringUtil.
simpleClassName(
getClass()) +
".decode() did not read anything but decoded a message.");
}
if (
isSingleDecode()) {
break;
}
}
} catch (
DecoderException e) {
throw
e;
} catch (
Exception cause) {
throw new
DecoderException(
cause);
}
}
/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
protected abstract void
decode(
ChannelHandlerContext ctx,
ByteBuf in,
List<
Object>
out) throws
Exception;
/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
final void
decodeRemovalReentryProtection(
ChannelHandlerContext ctx,
ByteBuf in,
List<
Object>
out)
throws
Exception {
decodeState =
STATE_CALLING_CHILD_DECODE;
try {
decode(
ctx,
in,
out);
} finally {
boolean
removePending =
decodeState ==
STATE_HANDLER_REMOVED_PENDING;
decodeState =
STATE_INIT;
if (
removePending) {
handlerRemoved(
ctx);
}
}
}
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void
decodeLast(
ChannelHandlerContext ctx,
ByteBuf in,
List<
Object>
out) throws
Exception {
if (
in.
isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(
ctx,
in,
out);
}
}
static
ByteBuf expandCumulation(
ByteBufAllocator alloc,
ByteBuf cumulation, int
readable) {
ByteBuf oldCumulation =
cumulation;
cumulation =
alloc.
buffer(
oldCumulation.
readableBytes() +
readable);
cumulation.
writeBytes(
oldCumulation);
oldCumulation.
release();
return
cumulation;
}
/**
* Cumulate {@link ByteBuf}s.
*/
public interface
Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(
ByteBufAllocator alloc,
ByteBuf cumulation,
ByteBuf in);
}
}