/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.
ObjectUtil;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
SystemPropertyUtil;
import io.netty.util.internal.
UnstableApi;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.lang.
Thread.
State;
import java.util.
ArrayList;
import java.util.
Collection;
import java.util.
LinkedHashSet;
import java.util.
List;
import java.util.
Queue;
import java.util.
Set;
import java.util.concurrent.
BlockingQueue;
import java.util.concurrent.
Callable;
import java.util.concurrent.
ExecutionException;
import java.util.concurrent.
Executor;
import java.util.concurrent.
LinkedBlockingQueue;
import java.util.concurrent.
RejectedExecutionException;
import java.util.concurrent.
Semaphore;
import java.util.concurrent.
ThreadFactory;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.
TimeoutException;
import java.util.concurrent.atomic.
AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.
AtomicReferenceFieldUpdater;
/**
* Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
*
*/
public abstract class
SingleThreadEventExecutor extends
AbstractScheduledEventExecutor implements
OrderedEventExecutor {
static final int
DEFAULT_MAX_PENDING_EXECUTOR_TASKS =
Math.
max(16,
SystemPropertyUtil.
getInt("io.netty.eventexecutor.maxPendingTasks",
Integer.
MAX_VALUE));
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
SingleThreadEventExecutor.class);
private static final int
ST_NOT_STARTED = 1;
private static final int
ST_STARTED = 2;
private static final int
ST_SHUTTING_DOWN = 3;
private static final int
ST_SHUTDOWN = 4;
private static final int
ST_TERMINATED = 5;
private static final
Runnable WAKEUP_TASK = new
Runnable() {
@
Override
public void
run() {
// Do nothing.
}
};
private static final
Runnable NOOP_TASK = new
Runnable() {
@
Override
public void
run() {
// Do nothing.
}
};
private static final
AtomicIntegerFieldUpdater<
SingleThreadEventExecutor>
STATE_UPDATER =
AtomicIntegerFieldUpdater.
newUpdater(
SingleThreadEventExecutor.class, "state");
private static final
AtomicReferenceFieldUpdater<
SingleThreadEventExecutor,
ThreadProperties>
PROPERTIES_UPDATER =
AtomicReferenceFieldUpdater.
newUpdater(
SingleThreadEventExecutor.class,
ThreadProperties.class, "threadProperties");
private final
Queue<
Runnable>
taskQueue;
private volatile
Thread thread;
@
SuppressWarnings("unused")
private volatile
ThreadProperties threadProperties;
private final
Executor executor;
private volatile boolean
interrupted;
private final
Semaphore threadLock = new
Semaphore(0);
private final
Set<
Runnable>
shutdownHooks = new
LinkedHashSet<
Runnable>();
private final boolean
addTaskWakesUp;
private final int
maxPendingTasks;
private final
RejectedExecutionHandler rejectedExecutionHandler;
private long
lastExecutionTime;
@
SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int
state =
ST_NOT_STARTED;
private volatile long
gracefulShutdownQuietPeriod;
private volatile long
gracefulShutdownTimeout;
private long
gracefulShutdownStartTime;
private final
Promise<?>
terminationFuture = new
DefaultPromise<
Void>(
GlobalEventExecutor.
INSTANCE);
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected
SingleThreadEventExecutor(
EventExecutorGroup parent,
ThreadFactory threadFactory, boolean
addTaskWakesUp) {
this(
parent, new
ThreadPerTaskExecutor(
threadFactory),
addTaskWakesUp);
}
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/
protected
SingleThreadEventExecutor(
EventExecutorGroup parent,
ThreadFactory threadFactory,
boolean
addTaskWakesUp, int
maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
this(
parent, new
ThreadPerTaskExecutor(
threadFactory),
addTaskWakesUp,
maxPendingTasks,
rejectedHandler);
}
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected
SingleThreadEventExecutor(
EventExecutorGroup parent,
Executor executor, boolean
addTaskWakesUp) {
this(
parent,
executor,
addTaskWakesUp,
DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
RejectedExecutionHandlers.
reject());
}
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/
protected
SingleThreadEventExecutor(
EventExecutorGroup parent,
Executor executor,
boolean
addTaskWakesUp, int
maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(
parent);
this.
addTaskWakesUp =
addTaskWakesUp;
this.
maxPendingTasks =
Math.
max(16,
maxPendingTasks);
this.
executor =
ObjectUtil.
checkNotNull(
executor, "executor");
taskQueue =
newTaskQueue(this.
maxPendingTasks);
rejectedExecutionHandler =
ObjectUtil.
checkNotNull(
rejectedHandler, "rejectedHandler");
}
/**
* @deprecated Please use and override {@link #newTaskQueue(int)}.
*/
@
Deprecated
protected
Queue<
Runnable>
newTaskQueue() {
return
newTaskQueue(
maxPendingTasks);
}
/**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all.
*/
protected
Queue<
Runnable>
newTaskQueue(int
maxPendingTasks) {
return new
LinkedBlockingQueue<
Runnable>(
maxPendingTasks);
}
/**
* Interrupt the current running {@link Thread}.
*/
protected void
interruptThread() {
Thread currentThread =
thread;
if (
currentThread == null) {
interrupted = true;
} else {
currentThread.
interrupt();
}
}
/**
* @see Queue#poll()
*/
protected
Runnable pollTask() {
assert
inEventLoop();
return
pollTaskFrom(
taskQueue);
}
protected static
Runnable pollTaskFrom(
Queue<
Runnable>
taskQueue) {
for (;;) {
Runnable task =
taskQueue.
poll();
if (
task ==
WAKEUP_TASK) {
continue;
}
return
task;
}
}
/**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
* <p>
* Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
* created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
* </p>
*
* @return {@code null} if the executor thread has been interrupted or waken up.
*/
protected
Runnable takeTask() {
assert
inEventLoop();
if (!(
taskQueue instanceof
BlockingQueue)) {
throw new
UnsupportedOperationException();
}
BlockingQueue<
Runnable>
taskQueue = (
BlockingQueue<
Runnable>) this.
taskQueue;
for (;;) {
ScheduledFutureTask<?>
scheduledTask =
peekScheduledTask();
if (
scheduledTask == null) {
Runnable task = null;
try {
task =
taskQueue.
take();
if (
task ==
WAKEUP_TASK) {
task = null;
}
} catch (
InterruptedException e) {
// Ignore
}
return
task;
} else {
long
delayNanos =
scheduledTask.
delayNanos();
Runnable task = null;
if (
delayNanos > 0) {
try {
task =
taskQueue.
poll(
delayNanos,
TimeUnit.
NANOSECONDS);
} catch (
InterruptedException e) {
// Waken up.
return null;
}
}
if (
task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
fetchFromScheduledTaskQueue();
task =
taskQueue.
poll();
}
if (
task != null) {
return
task;
}
}
}
}
private boolean
fetchFromScheduledTaskQueue() {
long
nanoTime =
AbstractScheduledEventExecutor.
nanoTime();
Runnable scheduledTask =
pollScheduledTask(
nanoTime);
while (
scheduledTask != null) {
if (!
taskQueue.
offer(
scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().
add((
ScheduledFutureTask<?>)
scheduledTask);
return false;
}
scheduledTask =
pollScheduledTask(
nanoTime);
}
return true;
}
/**
* @see Queue#peek()
*/
protected
Runnable peekTask() {
assert
inEventLoop();
return
taskQueue.
peek();
}
/**
* @see Queue#isEmpty()
*/
protected boolean
hasTasks() {
assert
inEventLoop();
return !
taskQueue.
isEmpty();
}
/**
* Return the number of tasks that are pending for processing.
*
* <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
* SingleThreadEventExecutor. So use it with care!</strong>
*/
public int
pendingTasks() {
return
taskQueue.
size();
}
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void
addTask(
Runnable task) {
if (
task == null) {
throw new
NullPointerException("task");
}
if (!
offerTask(
task)) {
reject(
task);
}
}
final boolean
offerTask(
Runnable task) {
if (
isShutdown()) {
reject();
}
return
taskQueue.
offer(
task);
}
/**
* @see Queue#remove(Object)
*/
protected boolean
removeTask(
Runnable task) {
if (
task == null) {
throw new
NullPointerException("task");
}
return
taskQueue.
remove(
task);
}
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
*
* @return {@code true} if and only if at least one task was run
*/
protected boolean
runAllTasks() {
assert
inEventLoop();
boolean
fetchedAll;
boolean
ranAtLeastOne = false;
do {
fetchedAll =
fetchFromScheduledTaskQueue();
if (
runAllTasksFrom(
taskQueue)) {
ranAtLeastOne = true;
}
} while (!
fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (
ranAtLeastOne) {
lastExecutionTime =
ScheduledFutureTask.
nanoTime();
}
afterRunningAllTasks();
return
ranAtLeastOne;
}
/**
* Runs all tasks from the passed {@code taskQueue}.
*
* @param taskQueue To poll and execute all tasks.
*
* @return {@code true} if at least one task was executed.
*/
protected final boolean
runAllTasksFrom(
Queue<
Runnable>
taskQueue) {
Runnable task =
pollTaskFrom(
taskQueue);
if (
task == null) {
return false;
}
for (;;) {
safeExecute(
task);
task =
pollTaskFrom(
taskQueue);
if (
task == null) {
return true;
}
}
}
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean
runAllTasks(long
timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task =
pollTask();
if (
task == null) {
afterRunningAllTasks();
return false;
}
final long
deadline =
ScheduledFutureTask.
nanoTime() +
timeoutNanos;
long
runTasks = 0;
long
lastExecutionTime;
for (;;) {
safeExecute(
task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((
runTasks & 0x3F) == 0) {
lastExecutionTime =
ScheduledFutureTask.
nanoTime();
if (
lastExecutionTime >=
deadline) {
break;
}
}
task =
pollTask();
if (
task == null) {
lastExecutionTime =
ScheduledFutureTask.
nanoTime();
break;
}
}
afterRunningAllTasks();
this.
lastExecutionTime =
lastExecutionTime;
return true;
}
/**
* Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
*/
@
UnstableApi
protected void
afterRunningAllTasks() { }
/**
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/
protected long
delayNanos(long
currentTimeNanos) {
ScheduledFutureTask<?>
scheduledTask =
peekScheduledTask();
if (
scheduledTask == null) {
return
SCHEDULE_PURGE_INTERVAL;
}
return
scheduledTask.
delayNanos(
currentTimeNanos);
}
/**
* Updates the internal timestamp that tells when a submitted task was executed most recently.
* {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
* usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or
* {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
* checks.
*/
protected void
updateLastExecutionTime() {
lastExecutionTime =
ScheduledFutureTask.
nanoTime();
}
/**
*
*/
protected abstract void
run();
/**
* Do nothing, sub-classes may override
*/
protected void
cleanup() {
// NOOP
}
protected void
wakeup(boolean
inEventLoop) {
if (!
inEventLoop ||
state ==
ST_SHUTTING_DOWN) {
// Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
// is already something in the queue.
taskQueue.
offer(
WAKEUP_TASK);
}
}
@
Override
public boolean
inEventLoop(
Thread thread) {
return
thread == this.
thread;
}
/**
* Add a {@link Runnable} which will be executed on shutdown of this instance
*/
public void
addShutdownHook(final
Runnable task) {
if (
inEventLoop()) {
shutdownHooks.
add(
task);
} else {
execute(new
Runnable() {
@
Override
public void
run() {
shutdownHooks.
add(
task);
}
});
}
}
/**
* Remove a previous added {@link Runnable} as a shutdown hook
*/
public void
removeShutdownHook(final
Runnable task) {
if (
inEventLoop()) {
shutdownHooks.
remove(
task);
} else {
execute(new
Runnable() {
@
Override
public void
run() {
shutdownHooks.
remove(
task);
}
});
}
}
private boolean
runShutdownHooks() {
boolean
ran = false;
// Note shutdown hooks can add / remove shutdown hooks.
while (!
shutdownHooks.
isEmpty()) {
List<
Runnable>
copy = new
ArrayList<
Runnable>(
shutdownHooks);
shutdownHooks.
clear();
for (
Runnable task:
copy) {
try {
task.
run();
} catch (
Throwable t) {
logger.
warn("Shutdown hook raised an exception.",
t);
} finally {
ran = true;
}
}
}
if (
ran) {
lastExecutionTime =
ScheduledFutureTask.
nanoTime();
}
return
ran;
}
@
Override
public
Future<?>
shutdownGracefully(long
quietPeriod, long
timeout,
TimeUnit unit) {
if (
quietPeriod < 0) {
throw new
IllegalArgumentException("quietPeriod: " +
quietPeriod + " (expected >= 0)");
}
if (
timeout <
quietPeriod) {
throw new
IllegalArgumentException(
"timeout: " +
timeout + " (expected >= quietPeriod (" +
quietPeriod + "))");
}
if (
unit == null) {
throw new
NullPointerException("unit");
}
if (
isShuttingDown()) {
return
terminationFuture();
}
boolean
inEventLoop =
inEventLoop();
boolean
wakeup;
int
oldState;
for (;;) {
if (
isShuttingDown()) {
return
terminationFuture();
}
int
newState;
wakeup = true;
oldState =
state;
if (
inEventLoop) {
newState =
ST_SHUTTING_DOWN;
} else {
switch (
oldState) {
case
ST_NOT_STARTED:
case
ST_STARTED:
newState =
ST_SHUTTING_DOWN;
break;
default:
newState =
oldState;
wakeup = false;
}
}
if (
STATE_UPDATER.
compareAndSet(this,
oldState,
newState)) {
break;
}
}
gracefulShutdownQuietPeriod =
unit.
toNanos(
quietPeriod);
gracefulShutdownTimeout =
unit.
toNanos(
timeout);
if (
oldState ==
ST_NOT_STARTED) {
try {
doStartThread();
} catch (
Throwable cause) {
STATE_UPDATER.
set(this,
ST_TERMINATED);
terminationFuture.
tryFailure(
cause);
if (!(
cause instanceof
Exception)) {
// Also rethrow as it may be an OOME for example
PlatformDependent.
throwException(
cause);
}
return
terminationFuture;
}
}
if (
wakeup) {
wakeup(
inEventLoop);
}
return
terminationFuture();
}
@
Override
public
Future<?>
terminationFuture() {
return
terminationFuture;
}
@
Override
@
Deprecated
public void
shutdown() {
if (
isShutdown()) {
return;
}
boolean
inEventLoop =
inEventLoop();
boolean
wakeup;
int
oldState;
for (;;) {
if (
isShuttingDown()) {
return;
}
int
newState;
wakeup = true;
oldState =
state;
if (
inEventLoop) {
newState =
ST_SHUTDOWN;
} else {
switch (
oldState) {
case
ST_NOT_STARTED:
case
ST_STARTED:
case
ST_SHUTTING_DOWN:
newState =
ST_SHUTDOWN;
break;
default:
newState =
oldState;
wakeup = false;
}
}
if (
STATE_UPDATER.
compareAndSet(this,
oldState,
newState)) {
break;
}
}
if (
oldState ==
ST_NOT_STARTED) {
try {
doStartThread();
} catch (
Throwable cause) {
STATE_UPDATER.
set(this,
ST_TERMINATED);
terminationFuture.
tryFailure(
cause);
if (!(
cause instanceof
Exception)) {
// Also rethrow as it may be an OOME for example
PlatformDependent.
throwException(
cause);
}
return;
}
}
if (
wakeup) {
wakeup(
inEventLoop);
}
}
@
Override
public boolean
isShuttingDown() {
return
state >=
ST_SHUTTING_DOWN;
}
@
Override
public boolean
isShutdown() {
return
state >=
ST_SHUTDOWN;
}
@
Override
public boolean
isTerminated() {
return
state ==
ST_TERMINATED;
}
/**
* Confirm that the shutdown if the instance should be done now!
*/
protected boolean
confirmShutdown() {
if (!
isShuttingDown()) {
return false;
}
if (!
inEventLoop()) {
throw new
IllegalStateException("must be invoked from an event loop");
}
cancelScheduledTasks();
if (
gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime =
ScheduledFutureTask.
nanoTime();
}
if (
runAllTasks() ||
runShutdownHooks()) {
if (
isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
// terminate if the quiet period is 0.
// See https://github.com/netty/netty/issues/4241
if (
gracefulShutdownQuietPeriod == 0) {
return true;
}
wakeup(true);
return false;
}
final long
nanoTime =
ScheduledFutureTask.
nanoTime();
if (
isShutdown() ||
nanoTime -
gracefulShutdownStartTime >
gracefulShutdownTimeout) {
return true;
}
if (
nanoTime -
lastExecutionTime <=
gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
try {
Thread.
sleep(100);
} catch (
InterruptedException e) {
// Ignore
}
return false;
}
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
@
Override
public boolean
awaitTermination(long
timeout,
TimeUnit unit) throws
InterruptedException {
if (
unit == null) {
throw new
NullPointerException("unit");
}
if (
inEventLoop()) {
throw new
IllegalStateException("cannot await termination of the current thread");
}
if (
threadLock.
tryAcquire(
timeout,
unit)) {
threadLock.
release();
}
return
isTerminated();
}
@
Override
public void
execute(
Runnable task) {
if (
task == null) {
throw new
NullPointerException("task");
}
boolean
inEventLoop =
inEventLoop();
addTask(
task);
if (!
inEventLoop) {
startThread();
if (
isShutdown() &&
removeTask(
task)) {
reject();
}
}
if (!
addTaskWakesUp &&
wakesUpForTask(
task)) {
wakeup(
inEventLoop);
}
}
@
Override
public <T> T
invokeAny(
Collection<? extends
Callable<T>>
tasks) throws
InterruptedException,
ExecutionException {
throwIfInEventLoop("invokeAny");
return super.invokeAny(
tasks);
}
@
Override
public <T> T
invokeAny(
Collection<? extends
Callable<T>>
tasks, long
timeout,
TimeUnit unit)
throws
InterruptedException,
ExecutionException,
TimeoutException {
throwIfInEventLoop("invokeAny");
return super.invokeAny(
tasks,
timeout,
unit);
}
@
Override
public <T>
List<java.util.concurrent.
Future<T>>
invokeAll(
Collection<? extends
Callable<T>>
tasks)
throws
InterruptedException {
throwIfInEventLoop("invokeAll");
return super.invokeAll(
tasks);
}
@
Override
public <T>
List<java.util.concurrent.
Future<T>>
invokeAll(
Collection<? extends
Callable<T>>
tasks, long
timeout,
TimeUnit unit) throws
InterruptedException {
throwIfInEventLoop("invokeAll");
return super.invokeAll(
tasks,
timeout,
unit);
}
private void
throwIfInEventLoop(
String method) {
if (
inEventLoop()) {
throw new
RejectedExecutionException("Calling " +
method + " from within the EventLoop is not allowed");
}
}
/**
* Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
* If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the
* it is fully started.
*/
public final
ThreadProperties threadProperties() {
ThreadProperties threadProperties = this.
threadProperties;
if (
threadProperties == null) {
Thread thread = this.
thread;
if (
thread == null) {
assert !
inEventLoop();
submit(
NOOP_TASK).
syncUninterruptibly();
thread = this.
thread;
assert
thread != null;
}
threadProperties = new
DefaultThreadProperties(
thread);
if (!
PROPERTIES_UPDATER.
compareAndSet(this, null,
threadProperties)) {
threadProperties = this.
threadProperties;
}
}
return
threadProperties;
}
@
SuppressWarnings("unused")
protected boolean
wakesUpForTask(
Runnable task) {
return true;
}
protected static void
reject() {
throw new
RejectedExecutionException("event executor terminated");
}
/**
* Offers the task to the associated {@link RejectedExecutionHandler}.
*
* @param task to reject.
*/
protected final void
reject(
Runnable task) {
rejectedExecutionHandler.
rejected(
task, this);
}
// ScheduledExecutorService implementation
private static final long
SCHEDULE_PURGE_INTERVAL =
TimeUnit.
SECONDS.
toNanos(1);
private void
startThread() {
if (
state ==
ST_NOT_STARTED) {
if (
STATE_UPDATER.
compareAndSet(this,
ST_NOT_STARTED,
ST_STARTED)) {
try {
doStartThread();
} catch (
Throwable cause) {
STATE_UPDATER.
set(this,
ST_NOT_STARTED);
PlatformDependent.
throwException(
cause);
}
}
}
}
private void
doStartThread() {
assert
thread == null;
executor.
execute(new
Runnable() {
@
Override
public void
run() {
thread =
Thread.
currentThread();
if (
interrupted) {
thread.
interrupt();
}
boolean
success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.
run();
success = true;
} catch (
Throwable t) {
logger.
warn("Unexpected exception from an event executor: ",
t);
} finally {
for (;;) {
int
oldState =
state;
if (
oldState >=
ST_SHUTTING_DOWN ||
STATE_UPDATER.
compareAndSet(
SingleThreadEventExecutor.this,
oldState,
ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (
success &&
gracefulShutdownStartTime == 0) {
if (
logger.
isErrorEnabled()) {
logger.
error("Buggy " +
EventExecutor.class.
getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.
getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (
confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.
set(
SingleThreadEventExecutor.this,
ST_TERMINATED);
threadLock.
release();
if (!
taskQueue.
isEmpty()) {
if (
logger.
isWarnEnabled()) {
logger.
warn("An event executor terminated with " +
"non-empty task queue (" +
taskQueue.
size() + ')');
}
}
terminationFuture.
setSuccess(null);
}
}
}
}
});
}
private static final class
DefaultThreadProperties implements
ThreadProperties {
private final
Thread t;
DefaultThreadProperties(
Thread t) {
this.
t =
t;
}
@
Override
public
State state() {
return
t.
getState();
}
@
Override
public int
priority() {
return
t.
getPriority();
}
@
Override
public boolean
isInterrupted() {
return
t.
isInterrupted();
}
@
Override
public boolean
isDaemon() {
return
t.
isDaemon();
}
@
Override
public
String name() {
return
t.
getName();
}
@
Override
public long
id() {
return
t.
getId();
}
@
Override
public
StackTraceElement[]
stackTrace() {
return
t.
getStackTrace();
}
@
Override
public boolean
isAlive() {
return
t.
isAlive();
}
}
}