/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.
Channel;
import io.netty.channel.
ChannelException;
import io.netty.channel.
EventLoop;
import io.netty.channel.
EventLoopException;
import io.netty.channel.
SelectStrategy;
import io.netty.channel.
SingleThreadEventLoop;
import io.netty.util.
IntSupplier;
import io.netty.util.concurrent.
RejectedExecutionHandler;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
ReflectionUtil;
import io.netty.util.internal.
SystemPropertyUtil;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.io.
IOException;
import java.lang.reflect.
Field;
import java.nio.channels.
CancelledKeyException;
import java.nio.channels.
SelectableChannel;
import java.nio.channels.
SelectionKey;
import java.nio.channels.
Selector;
import java.nio.channels.spi.
SelectorProvider;
import java.security.
AccessController;
import java.security.
PrivilegedAction;
import java.util.
ArrayList;
import java.util.
Collection;
import java.util.
Iterator;
import java.util.
Queue;
import java.util.
Set;
import java.util.concurrent.
Callable;
import java.util.concurrent.
Executor;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicBoolean;
/**
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
* {@link Selector} and so does the multi-plexing of these in the event loop.
*
*/
public final class
NioEventLoop extends
SingleThreadEventLoop {
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
NioEventLoop.class);
private static final int
CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
private static final boolean
DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.
getBoolean("io.netty.noKeySetOptimization", false);
private static final int
MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int
SELECTOR_AUTO_REBUILD_THRESHOLD;
private final
IntSupplier selectNowSupplier = new
IntSupplier() {
@
Override
public int
get() throws
Exception {
return
selectNow();
}
};
// Workaround for JDK NIO bug.
//
// See:
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
// - https://github.com/netty/netty/issues/203
static {
final
String key = "sun.nio.ch.bugLevel";
final
String buglevel =
SystemPropertyUtil.
get(
key);
if (
buglevel == null) {
try {
AccessController.
doPrivileged(new
PrivilegedAction<
Void>() {
@
Override
public
Void run() {
System.
setProperty(
key, "");
return null;
}
});
} catch (final
SecurityException e) {
logger.
debug("Unable to get/set System Property: " +
key,
e);
}
}
int
selectorAutoRebuildThreshold =
SystemPropertyUtil.
getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (
selectorAutoRebuildThreshold <
MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD =
selectorAutoRebuildThreshold;
if (
logger.
isDebugEnabled()) {
logger.
debug("-Dio.netty.noKeySetOptimization: {}",
DISABLE_KEYSET_OPTIMIZATION);
logger.
debug("-Dio.netty.selectorAutoRebuildThreshold: {}",
SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
/**
* The NIO {@link Selector}.
*/
private
Selector selector;
private
Selector unwrappedSelector;
private
SelectedSelectionKeySet selectedKeys;
private final
SelectorProvider provider;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final
AtomicBoolean wakenUp = new
AtomicBoolean();
private final
SelectStrategy selectStrategy;
private volatile int
ioRatio = 50;
private int
cancelledKeys;
private boolean
needsToSelectAgain;
NioEventLoop(
NioEventLoopGroup parent,
Executor executor,
SelectorProvider selectorProvider,
SelectStrategy strategy,
RejectedExecutionHandler rejectedExecutionHandler) {
super(
parent,
executor, false,
DEFAULT_MAX_PENDING_TASKS,
rejectedExecutionHandler);
if (
selectorProvider == null) {
throw new
NullPointerException("selectorProvider");
}
if (
strategy == null) {
throw new
NullPointerException("selectStrategy");
}
provider =
selectorProvider;
final
SelectorTuple selectorTuple =
openSelector();
selector =
selectorTuple.
selector;
unwrappedSelector =
selectorTuple.
unwrappedSelector;
selectStrategy =
strategy;
}
private static final class
SelectorTuple {
final
Selector unwrappedSelector;
final
Selector selector;
SelectorTuple(
Selector unwrappedSelector) {
this.
unwrappedSelector =
unwrappedSelector;
this.
selector =
unwrappedSelector;
}
SelectorTuple(
Selector unwrappedSelector,
Selector selector) {
this.
unwrappedSelector =
unwrappedSelector;
this.
selector =
selector;
}
}
private
SelectorTuple openSelector() {
final
Selector unwrappedSelector;
try {
unwrappedSelector =
provider.
openSelector();
} catch (
IOException e) {
throw new
ChannelException("failed to open a new selector",
e);
}
if (
DISABLE_KEYSET_OPTIMIZATION) {
return new
SelectorTuple(
unwrappedSelector);
}
Object maybeSelectorImplClass =
AccessController.
doPrivileged(new
PrivilegedAction<
Object>() {
@
Override
public
Object run() {
try {
return
Class.
forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.
getSystemClassLoader());
} catch (
Throwable cause) {
return
cause;
}
}
});
if (!(
maybeSelectorImplClass instanceof
Class) ||
// ensure the current selector implementation is what we can instrument.
!((
Class<?>)
maybeSelectorImplClass).
isAssignableFrom(
unwrappedSelector.
getClass())) {
if (
maybeSelectorImplClass instanceof
Throwable) {
Throwable t = (
Throwable)
maybeSelectorImplClass;
logger.
trace("failed to instrument a special java.util.Set into: {}",
unwrappedSelector,
t);
}
return new
SelectorTuple(
unwrappedSelector);
}
final
Class<?>
selectorImplClass = (
Class<?>)
maybeSelectorImplClass;
final
SelectedSelectionKeySet selectedKeySet = new
SelectedSelectionKeySet();
Object maybeException =
AccessController.
doPrivileged(new
PrivilegedAction<
Object>() {
@
Override
public
Object run() {
try {
Field selectedKeysField =
selectorImplClass.
getDeclaredField("selectedKeys");
Field publicSelectedKeysField =
selectorImplClass.
getDeclaredField("publicSelectedKeys");
Throwable cause =
ReflectionUtil.
trySetAccessible(
selectedKeysField, true);
if (
cause != null) {
return
cause;
}
cause =
ReflectionUtil.
trySetAccessible(
publicSelectedKeysField, true);
if (
cause != null) {
return
cause;
}
selectedKeysField.
set(
unwrappedSelector,
selectedKeySet);
publicSelectedKeysField.
set(
unwrappedSelector,
selectedKeySet);
return null;
} catch (
NoSuchFieldException e) {
return
e;
} catch (
IllegalAccessException e) {
return
e;
}
}
});
if (
maybeException instanceof
Exception) {
selectedKeys = null;
Exception e = (
Exception)
maybeException;
logger.
trace("failed to instrument a special java.util.Set into: {}",
unwrappedSelector,
e);
return new
SelectorTuple(
unwrappedSelector);
}
selectedKeys =
selectedKeySet;
logger.
trace("instrumented a special java.util.Set into: {}",
unwrappedSelector);
return new
SelectorTuple(
unwrappedSelector,
new
SelectedSelectionKeySetSelector(
unwrappedSelector,
selectedKeySet));
}
/**
* Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
*/
public
SelectorProvider selectorProvider() {
return
provider;
}
@
Override
protected
Queue<
Runnable>
newTaskQueue(int
maxPendingTasks) {
// This event loop never calls takeTask()
return
maxPendingTasks ==
Integer.
MAX_VALUE ?
PlatformDependent.<
Runnable>
newMpscQueue()
:
PlatformDependent.<
Runnable>
newMpscQueue(
maxPendingTasks);
}
/**
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
* be executed by this event loop when the {@link SelectableChannel} is ready.
*/
public void
register(final
SelectableChannel ch, final int
interestOps, final
NioTask<?>
task) {
if (
ch == null) {
throw new
NullPointerException("ch");
}
if (
interestOps == 0) {
throw new
IllegalArgumentException("interestOps must be non-zero.");
}
if ((
interestOps & ~
ch.
validOps()) != 0) {
throw new
IllegalArgumentException(
"invalid interestOps: " +
interestOps + "(validOps: " +
ch.
validOps() + ')');
}
if (
task == null) {
throw new
NullPointerException("task");
}
if (
isShutdown()) {
throw new
IllegalStateException("event loop shut down");
}
try {
ch.
register(
selector,
interestOps,
task);
} catch (
Exception e) {
throw new
EventLoopException("failed to register a channel",
e);
}
}
/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int
getIoRatio() {
return
ioRatio;
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void
setIoRatio(int
ioRatio) {
if (
ioRatio <= 0 ||
ioRatio > 100) {
throw new
IllegalArgumentException("ioRatio: " +
ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.
ioRatio =
ioRatio;
}
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void
rebuildSelector() {
if (!
inEventLoop()) {
execute(new
Runnable() {
@
Override
public void
run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
private void
rebuildSelector0() {
final
Selector oldSelector =
selector;
final
SelectorTuple newSelectorTuple;
if (
oldSelector == null) {
return;
}
try {
newSelectorTuple =
openSelector();
} catch (
Exception e) {
logger.
warn("Failed to create a new Selector.",
e);
return;
}
// Register all channels to the new Selector.
int
nChannels = 0;
for (
SelectionKey key:
oldSelector.
keys()) {
Object a =
key.
attachment();
try {
if (!
key.
isValid() ||
key.
channel().
keyFor(
newSelectorTuple.
unwrappedSelector) != null) {
continue;
}
int
interestOps =
key.
interestOps();
key.
cancel();
SelectionKey newKey =
key.
channel().
register(
newSelectorTuple.
unwrappedSelector,
interestOps,
a);
if (
a instanceof
AbstractNioChannel) {
// Update SelectionKey
((
AbstractNioChannel)
a).
selectionKey =
newKey;
}
nChannels ++;
} catch (
Exception e) {
logger.
warn("Failed to re-register a Channel to the new Selector.",
e);
if (
a instanceof
AbstractNioChannel) {
AbstractNioChannel ch = (
AbstractNioChannel)
a;
ch.
unsafe().
close(
ch.
unsafe().
voidPromise());
} else {
@
SuppressWarnings("unchecked")
NioTask<
SelectableChannel>
task = (
NioTask<
SelectableChannel>)
a;
invokeChannelUnregistered(
task,
key,
e);
}
}
}
selector =
newSelectorTuple.
selector;
unwrappedSelector =
newSelectorTuple.
unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.
close();
} catch (
Throwable t) {
if (
logger.
isWarnEnabled()) {
logger.
warn("Failed to close the old Selector.",
t);
}
}
if (
logger.
isInfoEnabled()) {
logger.
info("Migrated " +
nChannels + " channel(s) to the new Selector.");
}
}
@
Override
protected void
run() {
for (;;) {
try {
switch (
selectStrategy.
calculateStrategy(
selectNowSupplier,
hasTasks())) {
case
SelectStrategy.
CONTINUE:
continue;
case
SelectStrategy.
SELECT:
select(
wakenUp.
getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (
wakenUp.
get()) {
selector.
wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int
ioRatio = this.
ioRatio;
if (
ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long
ioStartTime =
System.
nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long
ioTime =
System.
nanoTime() -
ioStartTime;
runAllTasks(
ioTime * (100 -
ioRatio) /
ioRatio);
}
}
} catch (
Throwable t) {
handleLoopException(
t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (
isShuttingDown()) {
closeAll();
if (
confirmShutdown()) {
return;
}
}
} catch (
Throwable t) {
handleLoopException(
t);
}
}
}
private static void
handleLoopException(
Throwable t) {
logger.
warn("Unexpected exception in the selector loop.",
t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.
sleep(1000);
} catch (
InterruptedException e) {
// Ignore.
}
}
private void
processSelectedKeys() {
if (
selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(
selector.
selectedKeys());
}
}
@
Override
protected void
cleanup() {
try {
selector.
close();
} catch (
IOException e) {
logger.
warn("Failed to close a selector.",
e);
}
}
void
cancel(
SelectionKey key) {
key.
cancel();
cancelledKeys ++;
if (
cancelledKeys >=
CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
@
Override
protected
Runnable pollTask() {
Runnable task = super.pollTask();
if (
needsToSelectAgain) {
selectAgain();
}
return
task;
}
private void
processSelectedKeysPlain(
Set<
SelectionKey>
selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (
selectedKeys.
isEmpty()) {
return;
}
Iterator<
SelectionKey>
i =
selectedKeys.
iterator();
for (;;) {
final
SelectionKey k =
i.
next();
final
Object a =
k.
attachment();
i.
remove();
if (
a instanceof
AbstractNioChannel) {
processSelectedKey(
k, (
AbstractNioChannel)
a);
} else {
@
SuppressWarnings("unchecked")
NioTask<
SelectableChannel>
task = (
NioTask<
SelectableChannel>)
a;
processSelectedKey(
k,
task);
}
if (!
i.
hasNext()) {
break;
}
if (
needsToSelectAgain) {
selectAgain();
selectedKeys =
selector.
selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (
selectedKeys.
isEmpty()) {
break;
} else {
i =
selectedKeys.
iterator();
}
}
}
}
private void
processSelectedKeysOptimized() {
for (int
i = 0;
i <
selectedKeys.
size; ++
i) {
final
SelectionKey k =
selectedKeys.
keys[
i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.
keys[
i] = null;
final
Object a =
k.
attachment();
if (
a instanceof
AbstractNioChannel) {
processSelectedKey(
k, (
AbstractNioChannel)
a);
} else {
@
SuppressWarnings("unchecked")
NioTask<
SelectableChannel>
task = (
NioTask<
SelectableChannel>)
a;
processSelectedKey(
k,
task);
}
if (
needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.
reset(
i + 1);
selectAgain();
i = -1;
}
}
}
private void
processSelectedKey(
SelectionKey k,
AbstractNioChannel ch) {
final
AbstractNioChannel.
NioUnsafe unsafe =
ch.
unsafe();
if (!
k.
isValid()) {
final
EventLoop eventLoop;
try {
eventLoop =
ch.
eventLoop();
} catch (
Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (
eventLoop != this ||
eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.
close(
unsafe.
voidPromise());
return;
}
try {
int
readyOps =
k.
readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((
readyOps &
SelectionKey.
OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int
ops =
k.
interestOps();
ops &= ~
SelectionKey.
OP_CONNECT;
k.
interestOps(
ops);
unsafe.
finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((
readyOps &
SelectionKey.
OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.
unsafe().
forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((
readyOps & (
SelectionKey.
OP_READ |
SelectionKey.
OP_ACCEPT)) != 0 ||
readyOps == 0) {
unsafe.
read();
}
} catch (
CancelledKeyException ignored) {
unsafe.
close(
unsafe.
voidPromise());
}
}
private static void
processSelectedKey(
SelectionKey k,
NioTask<
SelectableChannel>
task) {
int
state = 0;
try {
task.
channelReady(
k.
channel(),
k);
state = 1;
} catch (
Exception e) {
k.
cancel();
invokeChannelUnregistered(
task,
k,
e);
state = 2;
} finally {
switch (
state) {
case 0:
k.
cancel();
invokeChannelUnregistered(
task,
k, null);
break;
case 1:
if (!
k.
isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(
task,
k, null);
}
break;
}
}
}
private void
closeAll() {
selectAgain();
Set<
SelectionKey>
keys =
selector.
keys();
Collection<
AbstractNioChannel>
channels = new
ArrayList<
AbstractNioChannel>(
keys.
size());
for (
SelectionKey k:
keys) {
Object a =
k.
attachment();
if (
a instanceof
AbstractNioChannel) {
channels.
add((
AbstractNioChannel)
a);
} else {
k.
cancel();
@
SuppressWarnings("unchecked")
NioTask<
SelectableChannel>
task = (
NioTask<
SelectableChannel>)
a;
invokeChannelUnregistered(
task,
k, null);
}
}
for (
AbstractNioChannel ch:
channels) {
ch.
unsafe().
close(
ch.
unsafe().
voidPromise());
}
}
private static void
invokeChannelUnregistered(
NioTask<
SelectableChannel>
task,
SelectionKey k,
Throwable cause) {
try {
task.
channelUnregistered(
k.
channel(),
cause);
} catch (
Exception e) {
logger.
warn("Unexpected exception while running NioTask.channelUnregistered()",
e);
}
}
@
Override
protected void
wakeup(boolean
inEventLoop) {
if (!
inEventLoop &&
wakenUp.
compareAndSet(false, true)) {
selector.
wakeup();
}
}
Selector unwrappedSelector() {
return
unwrappedSelector;
}
int
selectNow() throws
IOException {
try {
return
selector.
selectNow();
} finally {
// restore wakeup state if needed
if (
wakenUp.
get()) {
selector.
wakeup();
}
}
}
private void
select(boolean
oldWakenUp) throws
IOException {
Selector selector = this.
selector;
try {
int
selectCnt = 0;
long
currentTimeNanos =
System.
nanoTime();
long
selectDeadLineNanos =
currentTimeNanos +
delayNanos(
currentTimeNanos);
for (;;) {
long
timeoutMillis = (
selectDeadLineNanos -
currentTimeNanos + 500000L) / 1000000L;
if (
timeoutMillis <= 0) {
if (
selectCnt == 0) {
selector.
selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (
hasTasks() &&
wakenUp.
compareAndSet(false, true)) {
selector.
selectNow();
selectCnt = 1;
break;
}
int
selectedKeys =
selector.
select(
timeoutMillis);
selectCnt ++;
if (
selectedKeys != 0 ||
oldWakenUp ||
wakenUp.
get() ||
hasTasks() ||
hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (
Thread.
interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (
logger.
isDebugEnabled()) {
logger.
debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long
time =
System.
nanoTime();
if (
time -
TimeUnit.
MILLISECONDS.
toNanos(
timeoutMillis) >=
currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (
SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >=
SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.
warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt,
selector);
rebuildSelector();
selector = this.
selector;
// Select again to populate selectedKeys.
selector.
selectNow();
selectCnt = 1;
break;
}
currentTimeNanos =
time;
}
if (
selectCnt >
MIN_PREMATURE_SELECTOR_RETURNS) {
if (
logger.
isDebugEnabled()) {
logger.
debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1,
selector);
}
}
} catch (
CancelledKeyException e) {
if (
logger.
isDebugEnabled()) {
logger.
debug(
CancelledKeyException.class.
getSimpleName() + " raised by a Selector {} - JDK bug?",
selector,
e);
}
// Harmless exception - log anyway
}
}
private void
selectAgain() {
needsToSelectAgain = false;
try {
selector.
selectNow();
} catch (
Throwable t) {
logger.
warn("Failed to update SelectionKeys.",
t);
}
}
}