/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import io.netty.buffer.
ByteBuf;
import io.netty.buffer.
ByteBufHolder;
import io.netty.channel.
Channel;
import io.netty.channel.
ChannelDuplexHandler;
import io.netty.channel.
ChannelConfig;
import io.netty.channel.
ChannelHandlerContext;
import io.netty.channel.
ChannelOutboundBuffer;
import io.netty.channel.
ChannelPromise;
import io.netty.util.
Attribute;
import io.netty.util.
AttributeKey;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.util.concurrent.
TimeUnit;
/**
* <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
* (see {@link GlobalTrafficShapingHandler}) or per session
* bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
* It allows you to implement an almost real time monitoring of the bandwidth using
* the monitors from {@link TrafficCounter} that will call back every checkInterval
* the method doAccounting of this handler.</p>
*
* <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:</p>
* <ul>
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
* </ul>
*/
public abstract class
AbstractTrafficShapingHandler extends
ChannelDuplexHandler {
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
AbstractTrafficShapingHandler.class);
/**
* Default delay between two checks: 1s
*/
public static final long
DEFAULT_CHECK_INTERVAL = 1000;
/**
* Default max delay in case of traffic shaping
* (during which no communication will occur).
* Shall be less than TIMEOUT. Here half of "standard" 30s
*/
public static final long
DEFAULT_MAX_TIME = 15000;
/**
* Default max size to not exceed in buffer (write only).
*/
static final long
DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
/**
* Default minimal time to wait
*/
static final long
MINIMAL_WAIT = 10;
/**
* Traffic Counter
*/
protected
TrafficCounter trafficCounter;
/**
* Limit in B/s to apply to write
*/
private volatile long
writeLimit;
/**
* Limit in B/s to apply to read
*/
private volatile long
readLimit;
/**
* Max delay in wait
*/
protected volatile long
maxTime =
DEFAULT_MAX_TIME; // default 15 s
/**
* Delay between two performance snapshots
*/
protected volatile long
checkInterval =
DEFAULT_CHECK_INTERVAL; // default 1 s
static final
AttributeKey<
Boolean>
READ_SUSPENDED =
AttributeKey
.
valueOf(
AbstractTrafficShapingHandler.class.
getName() + ".READ_SUSPENDED");
static final
AttributeKey<
Runnable>
REOPEN_TASK =
AttributeKey.
valueOf(
AbstractTrafficShapingHandler.class
.
getName() + ".REOPEN_TASK");
/**
* Max time to delay before proposing to stop writing new objects from next handlers
*/
volatile long
maxWriteDelay = 4 *
DEFAULT_CHECK_INTERVAL; // default 4 s
/**
* Max size in the list before proposing to stop writing new objects from next handlers
*/
volatile long
maxWriteSize =
DEFAULT_MAX_SIZE; // default 4MB
/**
* Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
* Set in final constructor. Must be between 1 and 31
*/
final int
userDefinedWritabilityIndex;
/**
* Default value for Channel UserDefinedWritability index
*/
static final int
CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
/**
* Default value for Global UserDefinedWritability index
*/
static final int
GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
/**
* Default value for GlobalChannel UserDefinedWritability index
*/
static final int
GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
/**
* @param newTrafficCounter
* the TrafficCounter to set
*/
void
setTrafficCounter(
TrafficCounter newTrafficCounter) {
trafficCounter =
newTrafficCounter;
}
/**
* @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
* For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
* for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
* for GlobalChannel TSH it is defined as
* {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
*/
protected int
userDefinedWritabilityIndex() {
return
CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
}
/**
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
* Must be positive.
*/
protected
AbstractTrafficShapingHandler(long
writeLimit, long
readLimit, long
checkInterval, long
maxTime) {
if (
maxTime <= 0) {
throw new
IllegalArgumentException("maxTime must be positive");
}
userDefinedWritabilityIndex =
userDefinedWritabilityIndex();
this.
writeLimit =
writeLimit;
this.
readLimit =
readLimit;
this.
checkInterval =
checkInterval;
this.
maxTime =
maxTime;
}
/**
* Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
protected
AbstractTrafficShapingHandler(long
writeLimit, long
readLimit, long
checkInterval) {
this(
writeLimit,
readLimit,
checkInterval,
DEFAULT_MAX_TIME);
}
/**
* Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
protected
AbstractTrafficShapingHandler(long
writeLimit, long
readLimit) {
this(
writeLimit,
readLimit,
DEFAULT_CHECK_INTERVAL,
DEFAULT_MAX_TIME);
}
/**
* Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*/
protected
AbstractTrafficShapingHandler() {
this(0, 0,
DEFAULT_CHECK_INTERVAL,
DEFAULT_MAX_TIME);
}
/**
* Constructor using NO LIMIT and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
protected
AbstractTrafficShapingHandler(long
checkInterval) {
this(0, 0,
checkInterval,
DEFAULT_MAX_TIME);
}
/**
* Change the underlying limitations and check interval.
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param newWriteLimit The new write limit (in bytes)
* @param newReadLimit The new read limit (in bytes)
* @param newCheckInterval The new check interval (in milliseconds)
*/
public void
configure(long
newWriteLimit, long
newReadLimit,
long
newCheckInterval) {
configure(
newWriteLimit,
newReadLimit);
configure(
newCheckInterval);
}
/**
* Change the underlying limitations.
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param newWriteLimit The new write limit (in bytes)
* @param newReadLimit The new read limit (in bytes)
*/
public void
configure(long
newWriteLimit, long
newReadLimit) {
writeLimit =
newWriteLimit;
readLimit =
newReadLimit;
if (
trafficCounter != null) {
trafficCounter.
resetAccounting(
TrafficCounter.
milliSecondFromNano());
}
}
/**
* Change the check interval.
*
* @param newCheckInterval The new check interval (in milliseconds)
*/
public void
configure(long
newCheckInterval) {
checkInterval =
newCheckInterval;
if (
trafficCounter != null) {
trafficCounter.
configure(
checkInterval);
}
}
/**
* @return the writeLimit
*/
public long
getWriteLimit() {
return
writeLimit;
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param writeLimit the writeLimit to set
*/
public void
setWriteLimit(long
writeLimit) {
this.
writeLimit =
writeLimit;
if (
trafficCounter != null) {
trafficCounter.
resetAccounting(
TrafficCounter.
milliSecondFromNano());
}
}
/**
* @return the readLimit
*/
public long
getReadLimit() {
return
readLimit;
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param readLimit the readLimit to set
*/
public void
setReadLimit(long
readLimit) {
this.
readLimit =
readLimit;
if (
trafficCounter != null) {
trafficCounter.
resetAccounting(
TrafficCounter.
milliSecondFromNano());
}
}
/**
* @return the checkInterval
*/
public long
getCheckInterval() {
return
checkInterval;
}
/**
* @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
*/
public void
setCheckInterval(long
checkInterval) {
this.
checkInterval =
checkInterval;
if (
trafficCounter != null) {
trafficCounter.
configure(
checkInterval);
}
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param maxTime
* Max delay in wait, shall be less than TIME OUT in related protocol.
* Must be positive.
*/
public void
setMaxTimeWait(long
maxTime) {
if (
maxTime <= 0) {
throw new
IllegalArgumentException("maxTime must be positive");
}
this.
maxTime =
maxTime;
}
/**
* @return the max delay in wait to prevent TIME OUT
*/
public long
getMaxTimeWait() {
return
maxTime;
}
/**
* @return the maxWriteDelay
*/
public long
getMaxWriteDelay() {
return
maxWriteDelay;
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
* Must be positive.
*/
public void
setMaxWriteDelay(long
maxWriteDelay) {
if (
maxWriteDelay <= 0) {
throw new
IllegalArgumentException("maxWriteDelay must be positive");
}
this.
maxWriteDelay =
maxWriteDelay;
}
/**
* @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
*/
public long
getMaxWriteSize() {
return
maxWriteSize;
}
/**
* <p>Note that this limit is a best effort on memory limitation to prevent Out Of
* Memory Exception. To ensure it works, the handler generating the write should
* use one of the way provided by Netty to handle the capacity:</p>
* <p>- the {@code Channel.isWritable()} property and the corresponding
* {@code channelWritabilityChanged()}</p>
* <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
*
* @param maxWriteSize the maximum Write Size allowed in the buffer
* per channel before write suspended is set,
* default being {@value #DEFAULT_MAX_SIZE} bytes.
*/
public void
setMaxWriteSize(long
maxWriteSize) {
this.
maxWriteSize =
maxWriteSize;
}
/**
* Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting.
*
* @param counter
* the TrafficCounter that computes its performance
*/
protected void
doAccounting(
TrafficCounter counter) {
// NOOP by default
}
/**
* Class to implement setReadable at fix time
*/
static final class
ReopenReadTimerTask implements
Runnable {
final
ChannelHandlerContext ctx;
ReopenReadTimerTask(
ChannelHandlerContext ctx) {
this.
ctx =
ctx;
}
@
Override
public void
run() {
Channel channel =
ctx.
channel();
ChannelConfig config =
channel.
config();
if (!
config.
isAutoRead() &&
isHandlerActive(
ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (
logger.
isDebugEnabled()) {
logger.
debug("Not unsuspend: " +
config.
isAutoRead() + ':' +
isHandlerActive(
ctx));
}
channel.
attr(
READ_SUSPENDED).
set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (
logger.
isDebugEnabled()) {
if (
config.
isAutoRead() && !
isHandlerActive(
ctx)) {
if (
logger.
isDebugEnabled()) {
logger.
debug("Unsuspend: " +
config.
isAutoRead() + ':' +
isHandlerActive(
ctx));
}
} else {
if (
logger.
isDebugEnabled()) {
logger.
debug("Normal unsuspend: " +
config.
isAutoRead() + ':'
+
isHandlerActive(
ctx));
}
}
}
channel.
attr(
READ_SUSPENDED).
set(false);
config.
setAutoRead(true);
channel.
read();
}
if (
logger.
isDebugEnabled()) {
logger.
debug("Unsuspend final status => " +
config.
isAutoRead() + ':'
+
isHandlerActive(
ctx));
}
}
}
/**
* Release the Read suspension
*/
void
releaseReadSuspended(
ChannelHandlerContext ctx) {
Channel channel =
ctx.
channel();
channel.
attr(
READ_SUSPENDED).
set(false);
channel.
config().
setAutoRead(true);
}
@
Override
public void
channelRead(final
ChannelHandlerContext ctx, final
Object msg) throws
Exception {
long
size =
calculateSize(
msg);
long
now =
TrafficCounter.
milliSecondFromNano();
if (
size > 0) {
// compute the number of ms to wait before reopening the channel
long
wait =
trafficCounter.
readTimeToWait(
size,
readLimit,
maxTime,
now);
wait =
checkWaitReadTime(
ctx,
wait,
now);
if (
wait >=
MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active
Channel channel =
ctx.
channel();
ChannelConfig config =
channel.
config();
if (
logger.
isDebugEnabled()) {
logger.
debug("Read suspend: " +
wait + ':' +
config.
isAutoRead() + ':'
+
isHandlerActive(
ctx));
}
if (
config.
isAutoRead() &&
isHandlerActive(
ctx)) {
config.
setAutoRead(false);
channel.
attr(
READ_SUSPENDED).
set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<
Runnable>
attr =
channel.
attr(
REOPEN_TASK);
Runnable reopenTask =
attr.
get();
if (
reopenTask == null) {
reopenTask = new
ReopenReadTimerTask(
ctx);
attr.
set(
reopenTask);
}
ctx.
executor().
schedule(
reopenTask,
wait,
TimeUnit.
MILLISECONDS);
if (
logger.
isDebugEnabled()) {
logger.
debug("Suspend final status => " +
config.
isAutoRead() + ':'
+
isHandlerActive(
ctx) + " will reopened at: " +
wait);
}
}
}
}
informReadOperation(
ctx,
now);
ctx.
fireChannelRead(
msg);
}
/**
* Method overridden in GTSH to take into account specific timer for the channel.
* @param wait the wait delay computed in ms
* @param now the relative now time in ms
* @return the wait to use according to the context
*/
long
checkWaitReadTime(final
ChannelHandlerContext ctx, long
wait, final long
now) {
// no change by default
return
wait;
}
/**
* Method overridden in GTSH to take into account specific timer for the channel.
* @param now the relative now time in ms
*/
void
informReadOperation(final
ChannelHandlerContext ctx, final long
now) {
// default noop
}
protected static boolean
isHandlerActive(
ChannelHandlerContext ctx) {
Boolean suspended =
ctx.
channel().
attr(
READ_SUSPENDED).
get();
return
suspended == null ||
Boolean.
FALSE.
equals(
suspended);
}
@
Override
public void
read(
ChannelHandlerContext ctx) {
if (
isHandlerActive(
ctx)) {
// For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
ctx.
read();
}
}
@
Override
public void
write(final
ChannelHandlerContext ctx, final
Object msg, final
ChannelPromise promise)
throws
Exception {
long
size =
calculateSize(
msg);
long
now =
TrafficCounter.
milliSecondFromNano();
if (
size > 0) {
// compute the number of ms to wait before continue with the channel
long
wait =
trafficCounter.
writeTimeToWait(
size,
writeLimit,
maxTime,
now);
if (
wait >=
MINIMAL_WAIT) {
if (
logger.
isDebugEnabled()) {
logger.
debug("Write suspend: " +
wait + ':' +
ctx.
channel().
config().
isAutoRead() + ':'
+
isHandlerActive(
ctx));
}
submitWrite(
ctx,
msg,
size,
wait,
now,
promise);
return;
}
}
// to maintain order of write
submitWrite(
ctx,
msg,
size, 0,
now,
promise);
}
@
Deprecated
protected void
submitWrite(final
ChannelHandlerContext ctx, final
Object msg,
final long
delay, final
ChannelPromise promise) {
submitWrite(
ctx,
msg,
calculateSize(
msg),
delay,
TrafficCounter.
milliSecondFromNano(),
promise);
}
abstract void
submitWrite(
ChannelHandlerContext ctx,
Object msg, long
size, long
delay, long
now,
ChannelPromise promise);
@
Override
public void
channelRegistered(
ChannelHandlerContext ctx) throws
Exception {
setUserDefinedWritability(
ctx, true);
super.channelRegistered(
ctx);
}
void
setUserDefinedWritability(
ChannelHandlerContext ctx, boolean
writable) {
ChannelOutboundBuffer cob =
ctx.
channel().
unsafe().
outboundBuffer();
if (
cob != null) {
cob.
setUserDefinedWritability(
userDefinedWritabilityIndex,
writable);
}
}
/**
* Check the writability according to delay and size for the channel.
* Set if necessary setUserDefinedWritability status.
* @param delay the computed delay
* @param queueSize the current queueSize
*/
void
checkWriteSuspend(
ChannelHandlerContext ctx, long
delay, long
queueSize) {
if (
queueSize >
maxWriteSize ||
delay >
maxWriteDelay) {
setUserDefinedWritability(
ctx, false);
}
}
/**
* Explicitly release the Write suspended status.
*/
void
releaseWriteSuspended(
ChannelHandlerContext ctx) {
setUserDefinedWritability(
ctx, true);
}
/**
* @return the current TrafficCounter (if
* channel is still connected)
*/
public
TrafficCounter trafficCounter() {
return
trafficCounter;
}
@
Override
public
String toString() {
StringBuilder builder = new
StringBuilder(290)
.
append("TrafficShaping with Write Limit: ").
append(
writeLimit)
.
append(" Read Limit: ").
append(
readLimit)
.
append(" CheckInterval: ").
append(
checkInterval)
.
append(" maxDelay: ").
append(
maxWriteDelay)
.
append(" maxSize: ").
append(
maxWriteSize)
.
append(" and Counter: ");
if (
trafficCounter != null) {
builder.
append(
trafficCounter);
} else {
builder.
append("none");
}
return
builder.
toString();
}
/**
* Calculate the size of the given {@link Object}.
*
* This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
* @param msg the msg for which the size should be calculated.
* @return size the size of the msg or {@code -1} if unknown.
*/
protected long
calculateSize(
Object msg) {
if (
msg instanceof
ByteBuf) {
return ((
ByteBuf)
msg).
readableBytes();
}
if (
msg instanceof
ByteBufHolder) {
return ((
ByteBufHolder)
msg).
content().
readableBytes();
}
return -1;
}
}