/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.
ArrayList;
import java.util.
List;
import static java.lang.
Math.max;
import static java.lang.
Math.min;
/**
* The {@link RecvByteBufAllocator} that automatically increases and
* decreases the predicted buffer size on feed back.
* <p>
* It gradually increases the expected number of readable bytes if the previous
* read fully filled the allocated buffer. It gradually decreases the expected
* number of readable bytes if the read operation was not able to fill a certain
* amount of the allocated buffer two times consecutively. Otherwise, it keeps
* returning the same prediction.
*/
public class
AdaptiveRecvByteBufAllocator extends
DefaultMaxMessagesRecvByteBufAllocator {
static final int
DEFAULT_MINIMUM = 64;
static final int
DEFAULT_INITIAL = 1024;
static final int
DEFAULT_MAXIMUM = 65536;
private static final int
INDEX_INCREMENT = 4;
private static final int
INDEX_DECREMENT = 1;
private static final int[]
SIZE_TABLE;
static {
List<
Integer>
sizeTable = new
ArrayList<
Integer>();
for (int
i = 16;
i < 512;
i += 16) {
sizeTable.
add(
i);
}
for (int
i = 512;
i > 0;
i <<= 1) {
sizeTable.
add(
i);
}
SIZE_TABLE = new int[
sizeTable.
size()];
for (int
i = 0;
i <
SIZE_TABLE.length;
i ++) {
SIZE_TABLE[
i] =
sizeTable.
get(
i);
}
}
/**
* @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
*/
@
Deprecated
public static final
AdaptiveRecvByteBufAllocator DEFAULT = new
AdaptiveRecvByteBufAllocator();
private static int
getSizeTableIndex(final int
size) {
for (int
low = 0,
high =
SIZE_TABLE.length - 1;;) {
if (
high <
low) {
return
low;
}
if (
high ==
low) {
return
high;
}
int
mid =
low +
high >>> 1;
int
a =
SIZE_TABLE[
mid];
int
b =
SIZE_TABLE[
mid + 1];
if (
size >
b) {
low =
mid + 1;
} else if (
size <
a) {
high =
mid - 1;
} else if (
size ==
a) {
return
mid;
} else {
return
mid + 1;
}
}
}
private final class
HandleImpl extends
MaxMessageHandle {
private final int
minIndex;
private final int
maxIndex;
private int
index;
private int
nextReceiveBufferSize;
private boolean
decreaseNow;
public
HandleImpl(int
minIndex, int
maxIndex, int
initial) {
this.
minIndex =
minIndex;
this.
maxIndex =
maxIndex;
index =
getSizeTableIndex(
initial);
nextReceiveBufferSize =
SIZE_TABLE[
index];
}
@
Override
public void
lastBytesRead(int
bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
if (
bytes ==
attemptedBytesRead()) {
record(
bytes);
}
super.lastBytesRead(
bytes);
}
@
Override
public int
guess() {
return
nextReceiveBufferSize;
}
private void
record(int
actualReadBytes) {
if (
actualReadBytes <=
SIZE_TABLE[
max(0,
index -
INDEX_DECREMENT - 1)]) {
if (
decreaseNow) {
index =
max(
index -
INDEX_DECREMENT,
minIndex);
nextReceiveBufferSize =
SIZE_TABLE[
index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (
actualReadBytes >=
nextReceiveBufferSize) {
index =
min(
index +
INDEX_INCREMENT,
maxIndex);
nextReceiveBufferSize =
SIZE_TABLE[
index];
decreaseNow = false;
}
}
@
Override
public void
readComplete() {
record(
totalBytesRead());
}
}
private final int
minIndex;
private final int
maxIndex;
private final int
initial;
/**
* Creates a new predictor with the default parameters. With the default
* parameters, the expected buffer size starts from {@code 1024}, does not
* go down below {@code 64}, and does not go up above {@code 65536}.
*/
public
AdaptiveRecvByteBufAllocator() {
this(
DEFAULT_MINIMUM,
DEFAULT_INITIAL,
DEFAULT_MAXIMUM);
}
/**
* Creates a new predictor with the specified parameters.
*
* @param minimum the inclusive lower bound of the expected buffer size
* @param initial the initial buffer size when no feed back was received
* @param maximum the inclusive upper bound of the expected buffer size
*/
public
AdaptiveRecvByteBufAllocator(int
minimum, int
initial, int
maximum) {
if (
minimum <= 0) {
throw new
IllegalArgumentException("minimum: " +
minimum);
}
if (
initial <
minimum) {
throw new
IllegalArgumentException("initial: " +
initial);
}
if (
maximum <
initial) {
throw new
IllegalArgumentException("maximum: " +
maximum);
}
int
minIndex =
getSizeTableIndex(
minimum);
if (
SIZE_TABLE[
minIndex] <
minimum) {
this.
minIndex =
minIndex + 1;
} else {
this.
minIndex =
minIndex;
}
int
maxIndex =
getSizeTableIndex(
maximum);
if (
SIZE_TABLE[
maxIndex] >
maximum) {
this.
maxIndex =
maxIndex - 1;
} else {
this.
maxIndex =
maxIndex;
}
this.
initial =
initial;
}
@
SuppressWarnings("deprecation")
@
Override
public
Handle newHandle() {
return new
HandleImpl(
minIndex,
maxIndex,
initial);
}
@
Override
public
AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean
respectMaybeMoreData) {
super.respectMaybeMoreData(
respectMaybeMoreData);
return this;
}
}