/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.util.
Collections;
import java.util.
HashSet;
import java.util.
Queue;
import java.util.
Set;
import java.util.concurrent.
CountDownLatch;
import java.util.concurrent.
Executors;
import java.util.concurrent.
RejectedExecutionException;
import java.util.concurrent.
ThreadFactory;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicBoolean;
import java.util.concurrent.atomic.
AtomicInteger;
import java.util.concurrent.atomic.
AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.
AtomicLong;
import static io.netty.util.internal.
StringUtil.simpleClassName;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
*
* <h3>Tick Duration</h3>
*
* As described with 'approximated', this timer does not execute the scheduled
* {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
* check if there are any {@link TimerTask}s behind the schedule and execute
* them.
* <p>
* You can increase or decrease the accuracy of the execution timing by
* specifying smaller or larger tick duration in the constructor. In most
* network applications, I/O timeout does not need to be accurate. Therefore,
* the default tick duration is 100 milliseconds and you will not need to try
* different configurations in most cases.
*
* <h3>Ticks per Wheel (Wheel Size)</h3>
*
* {@link HashedWheelTimer} maintains a data structure called 'wheel'.
* To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
* function is 'dead line of the task'. The default number of ticks per wheel
* (i.e. the size of the wheel) is 512. You could specify a larger value
* if you are going to schedule a lot of timeouts.
*
* <h3>Do not create many instances.</h3>
*
* {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
* started. Therefore, you should make sure to create only one instance and
* share it across your application. One of the common mistakes, that makes
* your application unresponsive, is to create a new instance for every connection.
*
* <h3>Implementation Details</h3>
*
* {@link HashedWheelTimer} is based on
* <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
* Tony Lauck's paper,
* <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
* and Hierarchical Timing Wheels: data structures to efficiently implement a
* timer facility'</a>. More comprehensive slides are located
* <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
*/
public class
HashedWheelTimer implements
Timer {
static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
HashedWheelTimer.class);
private static final
AtomicInteger INSTANCE_COUNTER = new
AtomicInteger();
private static final
AtomicBoolean WARNED_TOO_MANY_INSTANCES = new
AtomicBoolean();
private static final int
INSTANCE_COUNT_LIMIT = 64;
private static final
ResourceLeakDetector<
HashedWheelTimer>
leakDetector =
ResourceLeakDetectorFactory.
instance()
.
newResourceLeakDetector(
HashedWheelTimer.class, 1);
private static final
AtomicIntegerFieldUpdater<
HashedWheelTimer>
WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.
newUpdater(
HashedWheelTimer.class, "workerState");
private final
ResourceLeakTracker<
HashedWheelTimer>
leak;
private final
Worker worker = new
Worker();
private final
Thread workerThread;
public static final int
WORKER_STATE_INIT = 0;
public static final int
WORKER_STATE_STARTED = 1;
public static final int
WORKER_STATE_SHUTDOWN = 2;
@
SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int
workerState; // 0 - init, 1 - started, 2 - shut down
private final long
tickDuration;
private final
HashedWheelBucket[]
wheel;
private final int
mask;
private final
CountDownLatch startTimeInitialized = new
CountDownLatch(1);
private final
Queue<
HashedWheelTimeout>
timeouts =
PlatformDependent.
newMpscQueue();
private final
Queue<
HashedWheelTimeout>
cancelledTimeouts =
PlatformDependent.
newMpscQueue();
private final
AtomicLong pendingTimeouts = new
AtomicLong(0);
private final long
maxPendingTimeouts;
private volatile long
startTime;
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}), default tick duration, and
* default number of ticks per wheel.
*/
public
HashedWheelTimer() {
this(
Executors.
defaultThreadFactory());
}
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}) and default number of ticks
* per wheel.
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is <= 0
*/
public
HashedWheelTimer(long
tickDuration,
TimeUnit unit) {
this(
Executors.
defaultThreadFactory(),
tickDuration,
unit);
}
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}).
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
*/
public
HashedWheelTimer(long
tickDuration,
TimeUnit unit, int
ticksPerWheel) {
this(
Executors.
defaultThreadFactory(),
tickDuration,
unit,
ticksPerWheel);
}
/**
* Creates a new timer with the default tick duration and default number of
* ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @throws NullPointerException if {@code threadFactory} is {@code null}
*/
public
HashedWheelTimer(
ThreadFactory threadFactory) {
this(
threadFactory, 100,
TimeUnit.
MILLISECONDS);
}
/**
* Creates a new timer with the default number of ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is <= 0
*/
public
HashedWheelTimer(
ThreadFactory threadFactory, long
tickDuration,
TimeUnit unit) {
this(
threadFactory,
tickDuration,
unit, 512);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
*/
public
HashedWheelTimer(
ThreadFactory threadFactory,
long
tickDuration,
TimeUnit unit, int
ticksPerWheel) {
this(
threadFactory,
tickDuration,
unit,
ticksPerWheel, true);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param leakDetection {@code true} if leak detection should be enabled always,
* if false it will only be enabled if the worker thread is not
* a daemon thread.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
*/
public
HashedWheelTimer(
ThreadFactory threadFactory,
long
tickDuration,
TimeUnit unit, int
ticksPerWheel, boolean
leakDetection) {
this(
threadFactory,
tickDuration,
unit,
ticksPerWheel,
leakDetection, -1);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param leakDetection {@code true} if leak detection should be enabled always,
* if false it will only be enabled if the worker thread is not
* a daemon thread.
* @param maxPendingTimeouts The maximum number of pending timeouts after which call to
* {@code newTimeout} will result in
* {@link java.util.concurrent.RejectedExecutionException}
* being thrown. No maximum pending timeouts limit is assumed if
* this value is 0 or negative.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
*/
public
HashedWheelTimer(
ThreadFactory threadFactory,
long
tickDuration,
TimeUnit unit, int
ticksPerWheel, boolean
leakDetection,
long
maxPendingTimeouts) {
if (
threadFactory == null) {
throw new
NullPointerException("threadFactory");
}
if (
unit == null) {
throw new
NullPointerException("unit");
}
if (
tickDuration <= 0) {
throw new
IllegalArgumentException("tickDuration must be greater than 0: " +
tickDuration);
}
if (
ticksPerWheel <= 0) {
throw new
IllegalArgumentException("ticksPerWheel must be greater than 0: " +
ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel =
createWheel(
ticksPerWheel);
mask =
wheel.length - 1;
// Convert tickDuration to nanos.
this.
tickDuration =
unit.
toNanos(
tickDuration);
// Prevent overflow.
if (this.
tickDuration >=
Long.
MAX_VALUE /
wheel.length) {
throw new
IllegalArgumentException(
String.
format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration,
Long.
MAX_VALUE /
wheel.length));
}
workerThread =
threadFactory.
newThread(
worker);
leak =
leakDetection || !
workerThread.
isDaemon() ?
leakDetector.
track(this) : null;
this.
maxPendingTimeouts =
maxPendingTimeouts;
if (
INSTANCE_COUNTER.
incrementAndGet() >
INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.
compareAndSet(false, true)) {
reportTooManyInstances();
}
}
@
Override
protected void
finalize() throws
Throwable {
try {
super.finalize();
} finally {
// This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
// we have not yet shutdown then we want to make sure we decrement the active instance count.
if (
WORKER_STATE_UPDATER.
getAndSet(this,
WORKER_STATE_SHUTDOWN) !=
WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.
decrementAndGet();
}
}
}
private static
HashedWheelBucket[]
createWheel(int
ticksPerWheel) {
if (
ticksPerWheel <= 0) {
throw new
IllegalArgumentException(
"ticksPerWheel must be greater than 0: " +
ticksPerWheel);
}
if (
ticksPerWheel > 1073741824) {
throw new
IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " +
ticksPerWheel);
}
ticksPerWheel =
normalizeTicksPerWheel(
ticksPerWheel);
HashedWheelBucket[]
wheel = new
HashedWheelBucket[
ticksPerWheel];
for (int
i = 0;
i <
wheel.length;
i ++) {
wheel[
i] = new
HashedWheelBucket();
}
return
wheel;
}
private static int
normalizeTicksPerWheel(int
ticksPerWheel) {
int
normalizedTicksPerWheel = 1;
while (
normalizedTicksPerWheel <
ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return
normalizedTicksPerWheel;
}
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void
start() {
switch (
WORKER_STATE_UPDATER.
get(this)) {
case
WORKER_STATE_INIT:
if (
WORKER_STATE_UPDATER.
compareAndSet(this,
WORKER_STATE_INIT,
WORKER_STATE_STARTED)) {
workerThread.
start();
}
break;
case
WORKER_STATE_STARTED:
break;
case
WORKER_STATE_SHUTDOWN:
throw new
IllegalStateException("cannot be started once stopped");
default:
throw new
Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (
startTime == 0) {
try {
startTimeInitialized.
await();
} catch (
InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
@
Override
public
Set<
Timeout>
stop() {
if (
Thread.
currentThread() ==
workerThread) {
throw new
IllegalStateException(
HashedWheelTimer.class.
getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.
getSimpleName());
}
if (!
WORKER_STATE_UPDATER.
compareAndSet(this,
WORKER_STATE_STARTED,
WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (
WORKER_STATE_UPDATER.
getAndSet(this,
WORKER_STATE_SHUTDOWN) !=
WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.
decrementAndGet();
if (
leak != null) {
boolean
closed =
leak.
close(this);
assert
closed;
}
}
return
Collections.
emptySet();
}
try {
boolean
interrupted = false;
while (
workerThread.
isAlive()) {
workerThread.
interrupt();
try {
workerThread.
join(100);
} catch (
InterruptedException ignored) {
interrupted = true;
}
}
if (
interrupted) {
Thread.
currentThread().
interrupt();
}
} finally {
INSTANCE_COUNTER.
decrementAndGet();
if (
leak != null) {
boolean
closed =
leak.
close(this);
assert
closed;
}
}
return
worker.
unprocessedTimeouts();
}
@
Override
public
Timeout newTimeout(
TimerTask task, long
delay,
TimeUnit unit) {
if (
task == null) {
throw new
NullPointerException("task");
}
if (
unit == null) {
throw new
NullPointerException("unit");
}
long
pendingTimeoutsCount =
pendingTimeouts.
incrementAndGet();
if (
maxPendingTimeouts > 0 &&
pendingTimeoutsCount >
maxPendingTimeouts) {
pendingTimeouts.
decrementAndGet();
throw new
RejectedExecutionException("Number of pending timeouts ("
+
pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" +
maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long
deadline =
System.
nanoTime() +
unit.
toNanos(
delay) -
startTime;
// Guard against overflow.
if (
delay > 0 &&
deadline < 0) {
deadline =
Long.
MAX_VALUE;
}
HashedWheelTimeout timeout = new
HashedWheelTimeout(this,
task,
deadline);
timeouts.
add(
timeout);
return
timeout;
}
/**
* Returns the number of pending timeouts of this {@link Timer}.
*/
public long
pendingTimeouts() {
return
pendingTimeouts.
get();
}
private static void
reportTooManyInstances() {
if (
logger.
isErrorEnabled()) {
String resourceType =
simpleClassName(
HashedWheelTimer.class);
logger.
error("You are creating too many " +
resourceType + " instances. " +
resourceType + " is a shared resource that must be reused across the JVM," +
"so that only a few instances are created.");
}
}
private final class
Worker implements
Runnable {
private final
Set<
Timeout>
unprocessedTimeouts = new
HashSet<
Timeout>();
private long
tick;
@
Override
public void
run() {
// Initialize the startTime.
startTime =
System.
nanoTime();
if (
startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.
countDown();
do {
final long
deadline =
waitForNextTick();
if (
deadline > 0) {
int
idx = (int) (
tick &
mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[
idx];
transferTimeoutsToBuckets();
bucket.
expireTimeouts(
deadline);
tick++;
}
} while (
WORKER_STATE_UPDATER.
get(
HashedWheelTimer.this) ==
WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (
HashedWheelBucket bucket:
wheel) {
bucket.
clearTimeouts(
unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout =
timeouts.
poll();
if (
timeout == null) {
break;
}
if (!
timeout.
isCancelled()) {
unprocessedTimeouts.
add(
timeout);
}
}
processCancelledTasks();
}
private void
transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int
i = 0;
i < 100000;
i++) {
HashedWheelTimeout timeout =
timeouts.
poll();
if (
timeout == null) {
// all processed
break;
}
if (
timeout.
state() ==
HashedWheelTimeout.
ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long
calculated =
timeout.
deadline /
tickDuration;
timeout.
remainingRounds = (
calculated -
tick) /
wheel.length;
final long
ticks =
Math.
max(
calculated,
tick); // Ensure we don't schedule for past.
int
stopIndex = (int) (
ticks &
mask);
HashedWheelBucket bucket =
wheel[
stopIndex];
bucket.
addTimeout(
timeout);
}
}
private void
processCancelledTasks() {
for (;;) {
HashedWheelTimeout timeout =
cancelledTimeouts.
poll();
if (
timeout == null) {
// all processed
break;
}
try {
timeout.
remove();
} catch (
Throwable t) {
if (
logger.
isWarnEnabled()) {
logger.
warn("An exception was thrown while process a cancellation task",
t);
}
}
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long
waitForNextTick() {
long
deadline =
tickDuration * (
tick + 1);
for (;;) {
final long
currentTime =
System.
nanoTime() -
startTime;
long
sleepTimeMs = (
deadline -
currentTime + 999999) / 1000000;
if (
sleepTimeMs <= 0) {
if (
currentTime ==
Long.
MIN_VALUE) {
return -
Long.
MAX_VALUE;
} else {
return
currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (
PlatformDependent.
isWindows()) {
sleepTimeMs =
sleepTimeMs / 10 * 10;
}
try {
Thread.
sleep(
sleepTimeMs);
} catch (
InterruptedException ignored) {
if (
WORKER_STATE_UPDATER.
get(
HashedWheelTimer.this) ==
WORKER_STATE_SHUTDOWN) {
return
Long.
MIN_VALUE;
}
}
}
}
public
Set<
Timeout>
unprocessedTimeouts() {
return
Collections.
unmodifiableSet(
unprocessedTimeouts);
}
}
private static final class
HashedWheelTimeout implements
Timeout {
private static final int
ST_INIT = 0;
private static final int
ST_CANCELLED = 1;
private static final int
ST_EXPIRED = 2;
private static final
AtomicIntegerFieldUpdater<
HashedWheelTimeout>
STATE_UPDATER =
AtomicIntegerFieldUpdater.
newUpdater(
HashedWheelTimeout.class, "state");
private final
HashedWheelTimer timer;
private final
TimerTask task;
private final long
deadline;
@
SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int
state =
ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
long
remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
HashedWheelTimeout next;
HashedWheelTimeout prev;
// The bucket to which the timeout was added
HashedWheelBucket bucket;
HashedWheelTimeout(
HashedWheelTimer timer,
TimerTask task, long
deadline) {
this.
timer =
timer;
this.
task =
task;
this.
deadline =
deadline;
}
@
Override
public
Timer timer() {
return
timer;
}
@
Override
public
TimerTask task() {
return
task;
}
@
Override
public boolean
cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!
compareAndSetState(
ST_INIT,
ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
timer.
cancelledTimeouts.
add(this);
return true;
}
void
remove() {
HashedWheelBucket bucket = this.
bucket;
if (
bucket != null) {
bucket.
remove(this);
} else {
timer.
pendingTimeouts.
decrementAndGet();
}
}
public boolean
compareAndSetState(int
expected, int
state) {
return
STATE_UPDATER.
compareAndSet(this,
expected,
state);
}
public int
state() {
return
state;
}
@
Override
public boolean
isCancelled() {
return
state() ==
ST_CANCELLED;
}
@
Override
public boolean
isExpired() {
return
state() ==
ST_EXPIRED;
}
public void
expire() {
if (!
compareAndSetState(
ST_INIT,
ST_EXPIRED)) {
return;
}
try {
task.
run(this);
} catch (
Throwable t) {
if (
logger.
isWarnEnabled()) {
logger.
warn("An exception was thrown by " +
TimerTask.class.
getSimpleName() + '.',
t);
}
}
}
@
Override
public
String toString() {
final long
currentTime =
System.
nanoTime();
long
remaining =
deadline -
currentTime +
timer.
startTime;
StringBuilder buf = new
StringBuilder(192)
.
append(
simpleClassName(this))
.
append('(')
.
append("deadline: ");
if (
remaining > 0) {
buf.
append(
remaining)
.
append(" ns later");
} else if (
remaining < 0) {
buf.
append(-
remaining)
.
append(" ns ago");
} else {
buf.
append("now");
}
if (
isCancelled()) {
buf.
append(", cancelled");
}
return
buf.
append(", task: ")
.
append(
task())
.
append(')')
.
toString();
}
}
/**
* Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
* removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
* extra object creation is needed.
*/
private static final class
HashedWheelBucket {
// Used for the linked-list datastructure
private
HashedWheelTimeout head;
private
HashedWheelTimeout tail;
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
public void
addTimeout(
HashedWheelTimeout timeout) {
assert
timeout.
bucket == null;
timeout.
bucket = this;
if (
head == null) {
head =
tail =
timeout;
} else {
tail.
next =
timeout;
timeout.
prev =
tail;
tail =
timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void
expireTimeouts(long
deadline) {
HashedWheelTimeout timeout =
head;
// process all timeouts
while (
timeout != null) {
HashedWheelTimeout next =
timeout.
next;
if (
timeout.
remainingRounds <= 0) {
next =
remove(
timeout);
if (
timeout.
deadline <=
deadline) {
timeout.
expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new
IllegalStateException(
String.
format(
"timeout.deadline (%d) > deadline (%d)",
timeout.
deadline,
deadline));
}
} else if (
timeout.
isCancelled()) {
next =
remove(
timeout);
} else {
timeout.
remainingRounds --;
}
timeout =
next;
}
}
public
HashedWheelTimeout remove(
HashedWheelTimeout timeout) {
HashedWheelTimeout next =
timeout.
next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (
timeout.
prev != null) {
timeout.
prev.
next =
next;
}
if (
timeout.
next != null) {
timeout.
next.
prev =
timeout.
prev;
}
if (
timeout ==
head) {
// if timeout is also the tail we need to adjust the entry too
if (
timeout ==
tail) {
tail = null;
head = null;
} else {
head =
next;
}
} else if (
timeout ==
tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail =
timeout.
prev;
}
// null out prev, next and bucket to allow for GC.
timeout.
prev = null;
timeout.
next = null;
timeout.
bucket = null;
timeout.
timer.
pendingTimeouts.
decrementAndGet();
return
next;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
public void
clearTimeouts(
Set<
Timeout>
set) {
for (;;) {
HashedWheelTimeout timeout =
pollTimeout();
if (
timeout == null) {
return;
}
if (
timeout.
isExpired() ||
timeout.
isCancelled()) {
continue;
}
set.
add(
timeout);
}
}
private
HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.
head;
if (
head == null) {
return null;
}
HashedWheelTimeout next =
head.
next;
if (
next == null) {
tail = this.
head = null;
} else {
this.
head =
next;
next.
prev = null;
}
// null out prev and next to allow for GC.
head.
next = null;
head.
prev = null;
head.
bucket = null;
return
head;
}
}
}