/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import static java.util.concurrent.
TimeUnit.
NANOSECONDS;
import java.util.concurrent.atomic.
AtomicLong;
import java.util.concurrent.locks.
Condition;
import java.util.concurrent.locks.
ReentrantLock;
import java.util.*;
/**
* A {@link ThreadPoolExecutor} that can additionally schedule
* commands to run after a given delay, or to execute
* periodically. This class is preferable to {@link java.util.Timer}
* when multiple worker threads are needed, or when the additional
* flexibility or capabilities of {@link ThreadPoolExecutor} (which
* this class extends) are required.
*
* <p>Delayed tasks execute no sooner than they are enabled, but
* without any real-time guarantees about when, after they are
* enabled, they will commence. Tasks scheduled for exactly the same
* execution time are enabled in first-in-first-out (FIFO) order of
* submission.
*
* <p>When a submitted task is cancelled before it is run, execution
* is suppressed. By default, such a cancelled task is not
* automatically removed from the work queue until its delay
* elapses. While this enables further inspection and monitoring, it
* may also cause unbounded retention of cancelled tasks. To avoid
* this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
* causes tasks to be immediately removed from the work queue at
* time of cancellation.
*
* <p>Successive executions of a task scheduled via
* {@code scheduleAtFixedRate} or
* {@code scheduleWithFixedDelay} do not overlap. While different
* executions may be performed by different threads, the effects of
* prior executions <a
* href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* those of subsequent ones.
*
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
* of the inherited tuning methods are not useful for it. In
* particular, because it acts as a fixed-sized pool using
* {@code corePoolSize} threads and an unbounded queue, adjustments
* to {@code maximumPoolSize} have no useful effect. Additionally, it
* is almost never a good idea to set {@code corePoolSize} to zero or
* use {@code allowCoreThreadTimeOut} because this may leave the pool
* without threads to handle tasks once they become eligible to run.
*
* <p><b>Extension notes:</b> This class overrides the
* {@link ThreadPoolExecutor#execute(Runnable) execute} and
* {@link AbstractExecutorService#submit(Runnable) submit}
* methods to generate internal {@link ScheduledFuture} objects to
* control per-task delays and scheduling. To preserve
* functionality, any further overrides of these methods in
* subclasses must invoke superclass versions, which effectively
* disables additional task customization. However, this class
* provides alternative protected extension method
* {@code decorateTask} (one version each for {@code Runnable} and
* {@code Callable}) that can be used to customize the concrete task
* types used to execute commands entered via {@code execute},
* {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
* and {@code scheduleWithFixedDelay}. By default, a
* {@code ScheduledThreadPoolExecutor} uses a task type extending
* {@link FutureTask}. However, this may be modified or replaced using
* subclasses of the form:
*
* <pre> {@code
* public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
*
* static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
*
* protected <V> RunnableScheduledFuture<V> decorateTask(
* Runnable r, RunnableScheduledFuture<V> task) {
* return new CustomTask<V>(r, task);
* }
*
* protected <V> RunnableScheduledFuture<V> decorateTask(
* Callable<V> c, RunnableScheduledFuture<V> task) {
* return new CustomTask<V>(c, task);
* }
* // ... add constructors, etc.
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
public class
ScheduledThreadPoolExecutor
extends
ThreadPoolExecutor
implements
ScheduledExecutorService {
/*
* This class specializes ThreadPoolExecutor implementation by
*
* 1. Using a custom task type, ScheduledFutureTask for
* tasks, even those that don't require scheduling (i.e.,
* those submitted using ExecutorService execute, not
* ScheduledExecutorService methods) which are treated as
* delayed tasks with a delay of zero.
*
* 2. Using a custom queue (DelayedWorkQueue), a variant of
* unbounded DelayQueue. The lack of capacity constraint and
* the fact that corePoolSize and maximumPoolSize are
* effectively identical simplifies some execution mechanics
* (see delayedExecute) compared to ThreadPoolExecutor.
*
* 3. Supporting optional run-after-shutdown parameters, which
* leads to overrides of shutdown methods to remove and cancel
* tasks that should NOT be run after shutdown, as well as
* different recheck logic when task (re)submission overlaps
* with a shutdown.
*
* 4. Task decoration methods to allow interception and
* instrumentation, which are needed because subclasses cannot
* otherwise override submit methods to get this effect. These
* don't have any impact on pool control logic though.
*/
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean
continueExistingPeriodicTasksAfterShutdown;
/**
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean
executeExistingDelayedTasksAfterShutdown = true;
/**
* True if ScheduledFutureTask.cancel should remove from queue
*/
private volatile boolean
removeOnCancel = false;
/**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
private static final
AtomicLong sequencer = new
AtomicLong();
/**
* Returns current nanosecond time.
*/
final long
now() {
return
System.
nanoTime();
}
private class
ScheduledFutureTask<V>
extends
FutureTask<V> implements
RunnableScheduledFuture<V> {
/** Sequence number to break ties FIFO */
private final long
sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
private long
time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long
period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V>
outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
*/
int
heapIndex;
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(
Runnable r, V
result, long
ns) {
super(
r,
result);
this.
time =
ns;
this.
period = 0;
this.
sequenceNumber =
sequencer.
getAndIncrement();
}
/**
* Creates a periodic action with given nano time and period.
*/
ScheduledFutureTask(
Runnable r, V
result, long
ns, long
period) {
super(
r,
result);
this.
time =
ns;
this.
period =
period;
this.
sequenceNumber =
sequencer.
getAndIncrement();
}
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(
Callable<V>
callable, long
ns) {
super(
callable);
this.
time =
ns;
this.
period = 0;
this.
sequenceNumber =
sequencer.
getAndIncrement();
}
public long
getDelay(
TimeUnit unit) {
return
unit.
convert(
time -
now(),
NANOSECONDS);
}
public int
compareTo(
Delayed other) {
if (
other == this) // compare zero if same object
return 0;
if (
other instanceof
ScheduledFutureTask) {
ScheduledFutureTask<?>
x = (
ScheduledFutureTask<?>)
other;
long
diff =
time -
x.
time;
if (
diff < 0)
return -1;
else if (
diff > 0)
return 1;
else if (
sequenceNumber <
x.
sequenceNumber)
return -1;
else
return 1;
}
long
diff =
getDelay(
NANOSECONDS) -
other.
getDelay(
NANOSECONDS);
return (
diff < 0) ? -1 : (
diff > 0) ? 1 : 0;
}
/**
* Returns {@code true} if this is a periodic (not a one-shot) action.
*
* @return {@code true} if periodic
*/
public boolean
isPeriodic() {
return
period != 0;
}
/**
* Sets the next time to run for a periodic task.
*/
private void
setNextRunTime() {
long
p =
period;
if (
p > 0)
time +=
p;
else
time =
triggerTime(-
p);
}
public boolean
cancel(boolean
mayInterruptIfRunning) {
boolean
cancelled = super.cancel(
mayInterruptIfRunning);
if (
cancelled &&
removeOnCancel &&
heapIndex >= 0)
remove(this);
return
cancelled;
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void
run() {
boolean
periodic =
isPeriodic();
if (!
canRunInCurrentRunState(
periodic))
cancel(false);
else if (!
periodic)
ScheduledFutureTask.super.run();
else if (
ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(
outerTask);
}
}
}
/**
* Returns true if can run a task given current run state
* and run-after-shutdown parameters.
*
* @param periodic true if this task periodic, false if delayed
*/
boolean
canRunInCurrentRunState(boolean
periodic) {
return
isRunningOrShutdown(
periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
/**
* Main execution method for delayed or periodic tasks. If pool
* is shut down, rejects the task. Otherwise adds task to queue
* and starts a thread, if necessary, to run it. (We cannot
* prestart the thread to run the task because the task (probably)
* shouldn't be run yet.) If the pool is shut down while the task
* is being added, cancel and remove it if required by state and
* run-after-shutdown parameters.
*
* @param task the task
*/
private void
delayedExecute(
RunnableScheduledFuture<?>
task) {
if (
isShutdown())
reject(
task);
else {
super.getQueue().
add(
task);
if (
isShutdown() &&
!
canRunInCurrentRunState(
task.
isPeriodic()) &&
remove(
task))
task.
cancel(false);
else
ensurePrestart();
}
}
/**
* Requeues a periodic task unless current run state precludes it.
* Same idea as delayedExecute except drops task rather than rejecting.
*
* @param task the task
*/
void
reExecutePeriodic(
RunnableScheduledFuture<?>
task) {
if (
canRunInCurrentRunState(true)) {
super.getQueue().
add(
task);
if (!
canRunInCurrentRunState(true) &&
remove(
task))
task.
cancel(false);
else
ensurePrestart();
}
}
/**
* Cancels and clears the queue of all tasks that should not be run
* due to shutdown policy. Invoked within super.shutdown.
*/
@
Override void
onShutdown() {
BlockingQueue<
Runnable>
q = super.getQueue();
boolean
keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean
keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!
keepDelayed && !
keepPeriodic) {
for (
Object e :
q.
toArray())
if (
e instanceof
RunnableScheduledFuture<?>)
((
RunnableScheduledFuture<?>)
e).
cancel(false);
q.
clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (
Object e :
q.
toArray()) {
if (
e instanceof
RunnableScheduledFuture) {
RunnableScheduledFuture<?>
t =
(
RunnableScheduledFuture<?>)
e;
if ((
t.
isPeriodic() ? !
keepPeriodic : !
keepDelayed) ||
t.
isCancelled()) { // also remove if already cancelled
if (
q.
remove(
t))
t.
cancel(false);
}
}
}
}
tryTerminate();
}
/**
* Modifies or replaces the task used to execute a runnable.
* This method can be used to override the concrete
* class used for managing internal tasks.
* The default implementation simply returns the given task.
*
* @param runnable the submitted Runnable
* @param task the task created to execute the runnable
* @param <V> the type of the task's result
* @return a task that can execute the runnable
* @since 1.6
*/
protected <V>
RunnableScheduledFuture<V>
decorateTask(
Runnable runnable,
RunnableScheduledFuture<V>
task) {
return
task;
}
/**
* Modifies or replaces the task used to execute a callable.
* This method can be used to override the concrete
* class used for managing internal tasks.
* The default implementation simply returns the given task.
*
* @param callable the submitted Callable
* @param task the task created to execute the callable
* @param <V> the type of the task's result
* @return a task that can execute the callable
* @since 1.6
*/
protected <V>
RunnableScheduledFuture<V>
decorateTask(
Callable<V>
callable,
RunnableScheduledFuture<V>
task) {
return
task;
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public
ScheduledThreadPoolExecutor(int
corePoolSize) {
super(
corePoolSize,
Integer.
MAX_VALUE, 0,
NANOSECONDS,
new
DelayedWorkQueue());
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public
ScheduledThreadPoolExecutor(int
corePoolSize,
ThreadFactory threadFactory) {
super(
corePoolSize,
Integer.
MAX_VALUE, 0,
NANOSECONDS,
new
DelayedWorkQueue(),
threadFactory);
}
/**
* Creates a new ScheduledThreadPoolExecutor with the given
* initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code handler} is null
*/
public
ScheduledThreadPoolExecutor(int
corePoolSize,
RejectedExecutionHandler handler) {
super(
corePoolSize,
Integer.
MAX_VALUE, 0,
NANOSECONDS,
new
DelayedWorkQueue(),
handler);
}
/**
* Creates a new ScheduledThreadPoolExecutor with the given
* initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} or
* {@code handler} is null
*/
public
ScheduledThreadPoolExecutor(int
corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(
corePoolSize,
Integer.
MAX_VALUE, 0,
NANOSECONDS,
new
DelayedWorkQueue(),
threadFactory,
handler);
}
/**
* Returns the trigger time of a delayed action.
*/
private long
triggerTime(long
delay,
TimeUnit unit) {
return
triggerTime(
unit.
toNanos((
delay < 0) ? 0 :
delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long
triggerTime(long
delay) {
return
now() +
((
delay < (
Long.
MAX_VALUE >> 1)) ?
delay :
overflowFree(
delay));
}
/**
* Constrains the values of all delays in the queue to be within
* Long.MAX_VALUE of each other, to avoid overflow in compareTo.
* This may occur if a task is eligible to be dequeued, but has
* not yet been, while some other task is added with a delay of
* Long.MAX_VALUE.
*/
private long
overflowFree(long
delay) {
Delayed head = (
Delayed) super.getQueue().
peek();
if (
head != null) {
long
headDelay =
head.
getDelay(
NANOSECONDS);
if (
headDelay < 0 && (
delay -
headDelay < 0))
delay =
Long.
MAX_VALUE +
headDelay;
}
return
delay;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public
ScheduledFuture<?>
schedule(
Runnable command,
long
delay,
TimeUnit unit) {
if (
command == null ||
unit == null)
throw new
NullPointerException();
RunnableScheduledFuture<?>
t =
decorateTask(
command,
new
ScheduledFutureTask<
Void>(
command, null,
triggerTime(
delay,
unit)));
delayedExecute(
t);
return
t;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V>
ScheduledFuture<V>
schedule(
Callable<V>
callable,
long
delay,
TimeUnit unit) {
if (
callable == null ||
unit == null)
throw new
NullPointerException();
RunnableScheduledFuture<V>
t =
decorateTask(
callable,
new
ScheduledFutureTask<V>(
callable,
triggerTime(
delay,
unit)));
delayedExecute(
t);
return
t;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public
ScheduledFuture<?>
scheduleAtFixedRate(
Runnable command,
long
initialDelay,
long
period,
TimeUnit unit) {
if (
command == null ||
unit == null)
throw new
NullPointerException();
if (
period <= 0)
throw new
IllegalArgumentException();
ScheduledFutureTask<
Void>
sft =
new
ScheduledFutureTask<
Void>(
command,
null,
triggerTime(
initialDelay,
unit),
unit.
toNanos(
period));
RunnableScheduledFuture<
Void>
t =
decorateTask(
command,
sft);
sft.
outerTask =
t;
delayedExecute(
t);
return
t;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public
ScheduledFuture<?>
scheduleWithFixedDelay(
Runnable command,
long
initialDelay,
long
delay,
TimeUnit unit) {
if (
command == null ||
unit == null)
throw new
NullPointerException();
if (
delay <= 0)
throw new
IllegalArgumentException();
ScheduledFutureTask<
Void>
sft =
new
ScheduledFutureTask<
Void>(
command,
null,
triggerTime(
initialDelay,
unit),
unit.
toNanos(-
delay));
RunnableScheduledFuture<
Void>
t =
decorateTask(
command,
sft);
sft.
outerTask =
t;
delayedExecute(
t);
return
t;
}
/**
* Executes {@code command} with zero required delay.
* This has effect equivalent to
* {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
* Note that inspections of the queue and of the list returned by
* {@code shutdownNow} will access the zero-delayed
* {@link ScheduledFuture}, not the {@code command} itself.
*
* <p>A consequence of the use of {@code ScheduledFuture} objects is
* that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
* called with a null second {@code Throwable} argument, even if the
* {@code command} terminated abruptly. Instead, the {@code Throwable}
* thrown by such a task can be obtained via {@link Future#get}.
*
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution because the
* executor has been shut down
* @throws NullPointerException {@inheritDoc}
*/
public void
execute(
Runnable command) {
schedule(
command, 0,
NANOSECONDS);
}
// Override AbstractExecutorService methods
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public
Future<?>
submit(
Runnable task) {
return
schedule(
task, 0,
NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T>
Future<T>
submit(
Runnable task, T
result) {
return
schedule(
Executors.
callable(
task,
result), 0,
NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T>
Future<T>
submit(
Callable<T>
task) {
return
schedule(
task, 0,
NANOSECONDS);
}
/**
* Sets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow} or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code false}.
*
* @param value if {@code true}, continue after shutdown, else don't
* @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
*/
public void
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean
value) {
continueExistingPeriodicTasksAfterShutdown =
value;
if (!
value &&
isShutdown())
onShutdown();
}
/**
* Gets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow} or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code false}.
*
* @return {@code true} if will continue after shutdown
* @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
*/
public boolean
getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return
continueExistingPeriodicTasksAfterShutdown;
}
/**
* Sets the policy on whether to execute existing delayed
* tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow}, or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code true}.
*
* @param value if {@code true}, execute after shutdown, else don't
* @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
*/
public void
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean
value) {
executeExistingDelayedTasksAfterShutdown =
value;
if (!
value &&
isShutdown())
onShutdown();
}
/**
* Gets the policy on whether to execute existing delayed
* tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow}, or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code true}.
*
* @return {@code true} if will execute after shutdown
* @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
*/
public boolean
getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return
executeExistingDelayedTasksAfterShutdown;
}
/**
* Sets the policy on whether cancelled tasks should be immediately
* removed from the work queue at time of cancellation. This value is
* by default {@code false}.
*
* @param value if {@code true}, remove on cancellation, else don't
* @see #getRemoveOnCancelPolicy
* @since 1.7
*/
public void
setRemoveOnCancelPolicy(boolean
value) {
removeOnCancel =
value;
}
/**
* Gets the policy on whether cancelled tasks should be immediately
* removed from the work queue at time of cancellation. This value is
* by default {@code false}.
*
* @return {@code true} if cancelled tasks are immediately removed
* from the queue
* @see #setRemoveOnCancelPolicy
* @since 1.7
*/
public boolean
getRemoveOnCancelPolicy() {
return
removeOnCancel;
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
* has been set {@code false}, existing delayed tasks whose delays
* have not yet elapsed are cancelled. And unless the {@code
* ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
* {@code true}, future executions of existing periodic tasks will
* be cancelled.
*
* @throws SecurityException {@inheritDoc}
*/
public void
shutdown() {
super.shutdown();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution.
* Each element of this list is a {@link ScheduledFuture},
* including those tasks submitted using {@code execute},
* which are for scheduling purposes used as the basis of a
* zero-delay {@code ScheduledFuture}.
* @throws SecurityException {@inheritDoc}
*/
public
List<
Runnable>
shutdownNow() {
return super.shutdownNow();
}
/**
* Returns the task queue used by this executor. Each element of
* this queue is a {@link ScheduledFuture}, including those
* tasks submitted using {@code execute} which are for scheduling
* purposes used as the basis of a zero-delay
* {@code ScheduledFuture}. Iteration over this queue is
* <em>not</em> guaranteed to traverse tasks in the order in
* which they will execute.
*
* @return the task queue
*/
public
BlockingQueue<
Runnable>
getQueue() {
return super.getQueue();
}
/**
* Specialized delay queue. To mesh with TPE declarations, this
* class must be declared as a BlockingQueue<Runnable> even though
* it can only hold RunnableScheduledFutures.
*/
static class
DelayedWorkQueue extends
AbstractQueue<
Runnable>
implements
BlockingQueue<
Runnable> {
/*
* A DelayedWorkQueue is based on a heap-based data structure
* like those in DelayQueue and PriorityQueue, except that
* every ScheduledFutureTask also records its index into the
* heap array. This eliminates the need to find a task upon
* cancellation, greatly speeding up removal (down from O(n)
* to O(log n)), and reducing garbage retention that would
* otherwise occur by waiting for the element to rise to top
* before clearing. But because the queue may also hold
* RunnableScheduledFutures that are not ScheduledFutureTasks,
* we are not guaranteed to have such indices available, in
* which case we fall back to linear search. (We expect that
* most tasks will not be decorated, and that the faster cases
* will be much more common.)
*
* All heap operations must record index changes -- mainly
* within siftUp and siftDown. Upon removal, a task's
* heapIndex is set to -1. Note that ScheduledFutureTasks can
* appear at most once in the queue (this need not be true for
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
private static final int
INITIAL_CAPACITY = 16;
private
RunnableScheduledFuture<?>[]
queue =
new
RunnableScheduledFuture<?>[
INITIAL_CAPACITY];
private final
ReentrantLock lock = new
ReentrantLock();
private int
size = 0;
/**
* Thread designated to wait for the task at the head of the
* queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with a
* task with an earlier expiration time, the leader field is
* invalidated by being reset to null, and some waiting
* thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private
Thread leader = null;
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private final
Condition available =
lock.
newCondition();
/**
* Sets f's heapIndex if it is a ScheduledFutureTask.
*/
private void
setIndex(
RunnableScheduledFuture<?>
f, int
idx) {
if (
f instanceof
ScheduledFutureTask)
((
ScheduledFutureTask)
f).
heapIndex =
idx;
}
/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
*/
private void
siftUp(int
k,
RunnableScheduledFuture<?>
key) {
while (
k > 0) {
int
parent = (
k - 1) >>> 1;
RunnableScheduledFuture<?>
e =
queue[
parent];
if (
key.
compareTo(
e) >= 0)
break;
queue[
k] =
e;
setIndex(
e,
k);
k =
parent;
}
queue[
k] =
key;
setIndex(
key,
k);
}
/**
* Sifts element added at top down to its heap-ordered spot.
* Call only when holding lock.
*/
private void
siftDown(int
k,
RunnableScheduledFuture<?>
key) {
int
half =
size >>> 1;
while (
k <
half) {
int
child = (
k << 1) + 1;
RunnableScheduledFuture<?>
c =
queue[
child];
int
right =
child + 1;
if (
right <
size &&
c.
compareTo(
queue[
right]) > 0)
c =
queue[
child =
right];
if (
key.
compareTo(
c) <= 0)
break;
queue[
k] =
c;
setIndex(
c,
k);
k =
child;
}
queue[
k] =
key;
setIndex(
key,
k);
}
/**
* Resizes the heap array. Call only when holding lock.
*/
private void
grow() {
int
oldCapacity =
queue.length;
int
newCapacity =
oldCapacity + (
oldCapacity >> 1); // grow 50%
if (
newCapacity < 0) // overflow
newCapacity =
Integer.
MAX_VALUE;
queue =
Arrays.
copyOf(
queue,
newCapacity);
}
/**
* Finds index of given object, or -1 if absent.
*/
private int
indexOf(
Object x) {
if (
x != null) {
if (
x instanceof
ScheduledFutureTask) {
int
i = ((
ScheduledFutureTask)
x).
heapIndex;
// Sanity check; x could conceivably be a
// ScheduledFutureTask from some other pool.
if (
i >= 0 &&
i <
size &&
queue[
i] ==
x)
return
i;
} else {
for (int
i = 0;
i <
size;
i++)
if (
x.
equals(
queue[
i]))
return
i;
}
}
return -1;
}
public boolean
contains(
Object x) {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
return
indexOf(
x) != -1;
} finally {
lock.
unlock();
}
}
public boolean
remove(
Object x) {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
int
i =
indexOf(
x);
if (
i < 0)
return false;
setIndex(
queue[
i], -1);
int
s = --
size;
RunnableScheduledFuture<?>
replacement =
queue[
s];
queue[
s] = null;
if (
s !=
i) {
siftDown(
i,
replacement);
if (
queue[
i] ==
replacement)
siftUp(
i,
replacement);
}
return true;
} finally {
lock.
unlock();
}
}
public int
size() {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
return
size;
} finally {
lock.
unlock();
}
}
public boolean
isEmpty() {
return
size() == 0;
}
public int
remainingCapacity() {
return
Integer.
MAX_VALUE;
}
public
RunnableScheduledFuture<?>
peek() {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
return
queue[0];
} finally {
lock.
unlock();
}
}
public boolean
offer(
Runnable x) {
if (
x == null)
throw new
NullPointerException();
RunnableScheduledFuture<?>
e = (
RunnableScheduledFuture<?>)
x;
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
int
i =
size;
if (
i >=
queue.length)
grow();
size =
i + 1;
if (
i == 0) {
queue[0] =
e;
setIndex(
e, 0);
} else {
siftUp(
i,
e);
}
if (
queue[0] ==
e) {
leader = null;
available.
signal();
}
} finally {
lock.
unlock();
}
return true;
}
public void
put(
Runnable e) {
offer(
e);
}
public boolean
add(
Runnable e) {
return
offer(
e);
}
public boolean
offer(
Runnable e, long
timeout,
TimeUnit unit) {
return
offer(
e);
}
/**
* Performs common bookkeeping for poll and take: Replaces
* first element with last and sifts it down. Call only when
* holding lock.
* @param f the task to remove and return
*/
private
RunnableScheduledFuture<?>
finishPoll(
RunnableScheduledFuture<?>
f) {
int
s = --
size;
RunnableScheduledFuture<?>
x =
queue[
s];
queue[
s] = null;
if (
s != 0)
siftDown(0,
x);
setIndex(
f, -1);
return
f;
}
public
RunnableScheduledFuture<?>
poll() {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
RunnableScheduledFuture<?>
first =
queue[0];
if (
first == null ||
first.
getDelay(
NANOSECONDS) > 0)
return null;
else
return
finishPoll(
first);
} finally {
lock.
unlock();
}
}
public
RunnableScheduledFuture<?>
take() throws
InterruptedException {
final
ReentrantLock lock = this.
lock;
lock.
lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?>
first =
queue[0];
if (
first == null)
available.
await();
else {
long
delay =
first.
getDelay(
NANOSECONDS);
if (
delay <= 0)
return
finishPoll(
first);
first = null; // don't retain ref while waiting
if (
leader != null)
available.
await();
else {
Thread thisThread =
Thread.
currentThread();
leader =
thisThread;
try {
available.
awaitNanos(
delay);
} finally {
if (
leader ==
thisThread)
leader = null;
}
}
}
}
} finally {
if (
leader == null &&
queue[0] != null)
available.
signal();
lock.
unlock();
}
}
public
RunnableScheduledFuture<?>
poll(long
timeout,
TimeUnit unit)
throws
InterruptedException {
long
nanos =
unit.
toNanos(
timeout);
final
ReentrantLock lock = this.
lock;
lock.
lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?>
first =
queue[0];
if (
first == null) {
if (
nanos <= 0)
return null;
else
nanos =
available.
awaitNanos(
nanos);
} else {
long
delay =
first.
getDelay(
NANOSECONDS);
if (
delay <= 0)
return
finishPoll(
first);
if (
nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (
nanos <
delay ||
leader != null)
nanos =
available.
awaitNanos(
nanos);
else {
Thread thisThread =
Thread.
currentThread();
leader =
thisThread;
try {
long
timeLeft =
available.
awaitNanos(
delay);
nanos -=
delay -
timeLeft;
} finally {
if (
leader ==
thisThread)
leader = null;
}
}
}
}
} finally {
if (
leader == null &&
queue[0] != null)
available.
signal();
lock.
unlock();
}
}
public void
clear() {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
for (int
i = 0;
i <
size;
i++) {
RunnableScheduledFuture<?>
t =
queue[
i];
if (
t != null) {
queue[
i] = null;
setIndex(
t, -1);
}
}
size = 0;
} finally {
lock.
unlock();
}
}
/**
* Returns first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
private
RunnableScheduledFuture<?>
peekExpired() {
// assert lock.isHeldByCurrentThread();
RunnableScheduledFuture<?>
first =
queue[0];
return (
first == null ||
first.
getDelay(
NANOSECONDS) > 0) ?
null :
first;
}
public int
drainTo(
Collection<? super
Runnable>
c) {
if (
c == null)
throw new
NullPointerException();
if (
c == this)
throw new
IllegalArgumentException();
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
RunnableScheduledFuture<?>
first;
int
n = 0;
while ((
first =
peekExpired()) != null) {
c.
add(
first); // In this order, in case add() throws.
finishPoll(
first);
++
n;
}
return
n;
} finally {
lock.
unlock();
}
}
public int
drainTo(
Collection<? super
Runnable>
c, int
maxElements) {
if (
c == null)
throw new
NullPointerException();
if (
c == this)
throw new
IllegalArgumentException();
if (
maxElements <= 0)
return 0;
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
RunnableScheduledFuture<?>
first;
int
n = 0;
while (
n <
maxElements && (
first =
peekExpired()) != null) {
c.
add(
first); // In this order, in case add() throws.
finishPoll(
first);
++
n;
}
return
n;
} finally {
lock.
unlock();
}
}
public
Object[]
toArray() {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
return
Arrays.
copyOf(
queue,
size,
Object[].class);
} finally {
lock.
unlock();
}
}
@
SuppressWarnings("unchecked")
public <T> T[]
toArray(T[]
a) {
final
ReentrantLock lock = this.
lock;
lock.
lock();
try {
if (
a.length <
size)
return (T[])
Arrays.
copyOf(
queue,
size,
a.
getClass());
System.
arraycopy(
queue, 0,
a, 0,
size);
if (
a.length >
size)
a[
size] = null;
return
a;
} finally {
lock.
unlock();
}
}
public
Iterator<
Runnable>
iterator() {
return new
Itr(
Arrays.
copyOf(
queue,
size));
}
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class
Itr implements
Iterator<
Runnable> {
final
RunnableScheduledFuture<?>[]
array;
int
cursor = 0; // index of next element to return
int
lastRet = -1; // index of last element, or -1 if no such
Itr(
RunnableScheduledFuture<?>[]
array) {
this.
array =
array;
}
public boolean
hasNext() {
return
cursor <
array.length;
}
public
Runnable next() {
if (
cursor >=
array.length)
throw new
NoSuchElementException();
lastRet =
cursor;
return
array[
cursor++];
}
public void
remove() {
if (
lastRet < 0)
throw new
IllegalStateException();
DelayedWorkQueue.this.
remove(
array[
lastRet]);
lastRet = -1;
}
}
}
}