/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.lang.
Thread.
UncaughtExceptionHandler;
import java.util.
ArrayList;
import java.util.
Arrays;
import java.util.
Collection;
import java.util.
Collections;
import java.util.
List;
import java.util.concurrent.
AbstractExecutorService;
import java.util.concurrent.
Callable;
import java.util.concurrent.
ExecutorService;
import java.util.concurrent.
Future;
import java.util.concurrent.
RejectedExecutionException;
import java.util.concurrent.
RunnableFuture;
import java.util.concurrent.
ThreadLocalRandom;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicLong;
import java.security.
AccessControlContext;
import java.security.
ProtectionDomain;
import java.security.
Permissions;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
* <em>work-stealing</em>: all threads in the pool attempt to find and
* execute tasks submitted to the pool and/or created by other active
* tasks (eventually blocking waiting for work if none exist). This
* enables efficient processing when most tasks spawn other subtasks
* (as do most {@code ForkJoinTask}s), as well as when many small
* tasks are submitted to the pool from external clients. Especially
* when setting <em>asyncMode</em> to true in constructors, {@code
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*
* <p>A static {@link #commonPool()} is available and appropriate for
* most applications. The common pool is used by any ForkJoinTask that
* is not explicitly submitted to a specified pool. Using the common
* pool normally reduces resource usage (its threads are slowly
* reclaimed during periods of non-use, and reinstated upon subsequent
* use).
*
* <p>For applications that require separate or custom pools, a {@code
* ForkJoinPool} may be constructed with a given target parallelism
* level; by default, equal to the number of available processors.
* The pool attempts to maintain enough active (or available) threads
* by dynamically adding, suspending, or resuming internal worker
* threads, even if some tasks are stalled waiting to join others.
* However, no such adjustments are guaranteed in the face of blocked
* I/O or other unmanaged synchronization. The nested {@link
* ManagedBlocker} interface enables extension of the kinds of
* synchronization accommodated.
*
* <p>In addition to execution and lifecycle control methods, this
* class provides status check methods (for example
* {@link #getStealCount}) that are intended to aid in developing,
* tuning, and monitoring fork/join applications. Also, method
* {@link #toString} returns indications of pool state in a
* convenient form for informal monitoring.
*
* <p>As is the case with other ExecutorServices, there are three
* main task execution methods summarized in the following table.
* These are designed to be used primarily by clients not already
* engaged in fork/join computations in the current pool. The main
* forms of these methods accept instances of {@code ForkJoinTask},
* but overloaded forms also allow mixed execution of plain {@code
* Runnable}- or {@code Callable}- based activities as well. However,
* tasks that are already executing in a pool should normally instead
* use the within-computation forms listed in the table unless using
* async event-style tasks that are not usually joined, in which case
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of task execution methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
* <td> <b>Arrange async execution</b></td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
* <td> <b>Await and obtain result</b></td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
* <td> <b>Arrange exec and obtain Future</b></td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* <p>The common pool is by default constructed with default
* parameters, but these may be controlled by setting three
* {@linkplain System#getProperty system properties}:
* <ul>
* <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
* - the parallelism level, a non-negative integer
* <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
* - the class name of a {@link ForkJoinWorkerThreadFactory}
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
* - the class name of a {@link UncaughtExceptionHandler}
* </ul>
* If a {@link SecurityManager} is present and no factory is
* specified, then the default pool uses a factory supplying
* threads that have no {@link Permissions} enabled.
* The system class loader is used to load these classes.
* Upon any error in establishing these settings, default parameters
* are used. It is possible to disable or limit the use of threads in
* the common pool by setting the parallelism property to zero, and/or
* using a factory that may return {@code null}. However doing so may
* cause unjoined tasks to never be executed.
*
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
* pools with greater than the maximum number result in
* {@code IllegalArgumentException}.
*
* <p>This implementation rejects submitted tasks (that is, by throwing
* {@link RejectedExecutionException}) only when the pool is shut down
* or internal resources have been exhausted.
*
* @since 1.7
* @author Doug Lea
*/
@sun.misc.
Contended
public class
ForkJoinPool extends
AbstractExecutorService {
/*
* Implementation Overview
*
* This class and its nested classes provide the main
* functionality and control for a set of worker threads:
* Submissions from non-FJ threads enter into submission queues.
* Workers take these tasks and typically split them into subtasks
* that may be stolen by other workers. Preference rules give
* first priority to processing tasks from their own queues (LIFO
* or FIFO, depending on mode), then to randomized FIFO steals of
* tasks in other queues. This framework began as vehicle for
* supporting tree-structured parallelism using work-stealing.
* Over time, its scalability advantages led to extensions and
* changes to better support more diverse usage contexts. Because
* most internal methods and nested classes are interrelated,
* their main rationale and descriptions are presented here;
* individual methods and nested classes contain only brief
* comments about details.
*
* WorkQueues
* ==========
*
* Most operations occur within work-stealing queues (in nested
* class WorkQueue). These are special forms of Deques that
* support only three of the four possible end-operations -- push,
* pop, and poll (aka steal), under the further constraints that
* push and pop are called only from the owning thread (or, as
* extended here, under a lock), while poll may be called from
* other threads. (If you are unfamiliar with them, you probably
* want to read Herlihy and Shavit's book "The Art of
* Multiprocessor programming", chapter 16 describing these in
* more detail before proceeding.) The main work-stealing queue
* design is roughly similar to those in the papers "Dynamic
* Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
* The main differences ultimately stem from GC requirements that
* we null out taken slots as soon as we can, to maintain as small
* a footprint as possible even in programs generating huge
* numbers of tasks. To accomplish this, we shift the CAS
* arbitrating pop vs poll (steal) from being on the indices
* ("base" and "top") to the slots themselves.
*
* Adding tasks then takes the form of a classic array push(task):
* q.array[q.top] = task; ++q.top;
*
* (The actual code needs to null-check and size-check the array,
* properly fence the accesses, and possibly signal waiting
* workers to start scanning -- see below.) Both a successful pop
* and poll mainly entail a CAS of a slot from non-null to null.
*
* The pop operation (always performed by owner) is:
* if ((base != top) and
* (the task at top slot is not null) and
* (CAS slot to null))
* decrement top and return task;
*
* And the poll operation (usually by a stealer) is
* if ((base != top) and
* (the task at base slot is not null) and
* (base has not changed) and
* (CAS slot to null))
* increment base and return task;
*
* Because we rely on CASes of references, we do not need tag bits
* on base or top. They are simple ints as used in any circular
* array-based queue (see for example ArrayDeque). Updates to the
* indices guarantee that top == base means the queue is empty,
* but otherwise may err on the side of possibly making the queue
* appear nonempty when a push, pop, or poll have not fully
* committed. (Method isEmpty() checks the case of a partially
* completed removal of the last element.) Because of this, the
* poll operation, considered individually, is not wait-free. One
* thief cannot successfully continue until another in-progress
* one (or, if previously empty, a push) completes. However, in
* the aggregate, we ensure at least probabilistic
* non-blockingness. If an attempted steal fails, a thief always
* chooses a different random victim target to try next. So, in
* order for one thief to progress, it suffices for any
* in-progress poll or new push on any empty queue to
* complete. (This is why we normally use method pollAt and its
* variants that try once at the apparent base index, else
* consider alternative actions, rather than method poll, which
* retries.)
*
* This approach also enables support of a user mode in which
* local task processing is in FIFO, not LIFO order, simply by
* using poll rather than pop. This can be useful in
* message-passing frameworks in which tasks are never joined.
* However neither mode considers affinities, loads, cache
* localities, etc, so rarely provide the best possible
* performance on a given machine, but portably provide good
* throughput by averaging over these factors. Further, even if
* we did try to use such information, we do not usually have a
* basis for exploiting it. For example, some sets of tasks
* profit from cache affinities, but others are harmed by cache
* pollution effects. Additionally, even though it requires
* scanning, long-term throughput is often best using random
* selection rather than directed selection policies, so cheap
* randomization of sufficient quality is used whenever
* applicable. Various Marsaglia XorShifts (some with different
* shift constants) are inlined at use points.
*
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* by workers. Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocalRandom probe value serves as a hash code for
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters. In essence, submitters act
* like workers except that they are restricted to executing local
* tasks that they submitted (or in the case of CountedCompleters,
* others with the same root task). Insertion of tasks in shared
* mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using field
* qlock), because submitters encountering a busy queue move on to
* try or create other queues -- they block only when creating and
* registering new queues. Additionally, "qlock" saturates to an
* unlockable value (-1) at shutdown. Unlocking still can be and
* is performed by cheaper ordered writes of "qlock" in successful
* cases, but uses CAS in unsuccessful cases.
*
* Management
* ==========
*
* The main throughput advantages of work-stealing stem from
* decentralized control -- workers mostly take tasks from
* themselves or each other, at rates that can exceed a billion
* per second. The pool itself creates, activates (enables
* scanning for and running tasks), deactivates, blocks, and
* terminates threads, all with minimal central information.
* There are only a few properties that we can globally track or
* maintain, so we pack them into a small number of variables,
* often maintaining atomicity without blocking or locking.
* Nearly all essentially atomic control state is held in two
* volatile variables that are by far most often read (not
* written) as status and consistency checks. (Also, field
* "config" holds unchanging configuration state.)
*
* Field "ctl" contains 64 bits holding information needed to
* atomically decide to add, inactivate, enqueue (on an event
* queue), dequeue, and/or re-activate workers. To enable this
* packing, we restrict maximum parallelism to (1<<15)-1 (which is
* far in excess of normal operating range) to allow ids, counts,
* and their negations (used for thresholding) to fit into 16bit
* subfields.
*
* Field "runState" holds lockable state bits (STARTED, STOP, etc)
* also protecting updates to the workQueues array. When used as
* a lock, it is normally held only for a few instructions (the
* only exceptions are one-time array initialization and uncommon
* resizing), so is nearly always available after at most a brief
* spin. But to be extra-cautious, after spinning, method
* awaitRunStateLock (called only if an initial CAS fails), uses a
* wait/notify mechanics on a builtin monitor to block when
* (rarely) needed. This would be a terrible idea for a highly
* contended lock, but most pools run without the lock ever
* contending after the spin limit, so this works fine as a more
* conservative alternative. Because we don't otherwise have an
* internal Object to use as a monitor, the "stealCounter" (an
* AtomicLong) is used when available (it too must be lazily
* initialized; see externalSubmit).
*
* Usages of "runState" vs "ctl" interact in only one case:
* deciding to add a worker thread (see tryAddWorker), in which
* case the ctl CAS is performed while the lock is held.
*
* Recording WorkQueues. WorkQueues are recorded in the
* "workQueues" array. The array is created upon first use (see
* externalSubmit) and expanded if necessary. Updates to the
* array while recording new workers and unrecording terminated
* ones are protected from each other by the runState lock, but
* the array is otherwise concurrently readable, and accessed
* directly. We also ensure that reads of the array reference
* itself never become too stale. To simplify index-based
* operations, the array size is always a power of two, and all
* readers must tolerate null slots. Worker queues are at odd
* indices. Shared (submission) queues are at even indices, up to
* a maximum of 64 slots, to limit growth even if array needs to
* expand to add more workers. Grouping them together in this way
* simplifies and speeds up task scanning.
*
* All worker thread creation is on-demand, triggered by task
* submissions, replacement of terminated workers, and/or
* compensation for blocked workers. However, all other support
* code is set up to work with other policies. To ensure that we
* do not hold on to worker references that would prevent GC, All
* accesses to workQueues are via indices into the workQueues
* array (which is one source of some of the messy code
* constructions here). In essence, the workQueues array serves as
* a weak reference mechanism. Thus for example the stack top
* subfield of ctl stores indices, not references.
*
* Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
* cannot let workers spin indefinitely scanning for tasks when
* none can be found immediately, and we cannot start/resume
* workers unless there appear to be tasks available. On the
* other hand, we must quickly prod them into action when new
* tasks are submitted or generated. In many usages, ramp-up time
* to activate workers is the main limiting factor in overall
* performance, which is compounded at program start-up by JIT
* compilation and allocation. So we streamline this as much as
* possible.
*
* The "ctl" field atomically maintains active and total worker
* counts as well as a queue to place waiting threads so they can
* be located for signalling. Active counts also play the role of
* quiescence indicators, so are decremented when workers believe
* that there are no more tasks to execute. The "queue" is
* actually a form of Treiber stack. A stack is ideal for
* activating threads in most-recently used order. This improves
* performance and locality, outweighing the disadvantages of
* being prone to contention and inability to release a worker
* unless it is topmost on stack. We park/unpark workers after
* pushing on the idle worker stack (represented by the lower
* 32bit subfield of ctl) when they cannot find work. The top
* stack state holds the value of the "scanState" field of the
* worker: its index and status, plus a version counter that, in
* addition to the count subfields (also serving as version
* stamps) provide protection against Treiber stack ABA effects.
*
* Field scanState is used by both workers and the pool to manage
* and track whether a worker is INACTIVE (possibly blocked
* waiting for a signal), or SCANNING for tasks (when neither hold
* it is busy running tasks). When a worker is inactivated, its
* scanState field is set, and is prevented from executing tasks,
* even though it must scan once for them to avoid queuing
* races. Note that scanState updates lag queue CAS releases so
* usage requires care. When queued, the lower 16 bits of
* scanState must hold its pool index. So we place the index there
* upon initialization (see registerWorker) and otherwise keep it
* there or restore it when necessary.
*
* Memory ordering. See "Correct and Efficient Work-Stealing for
* Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
* (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
* analysis of memory ordering requirements in work-stealing
* algorithms similar to the one used here. We usually need
* stronger than minimal ordering because we must sometimes signal
* workers, requiring Dekker-like full-fences to avoid lost
* signals. Arranging for enough ordering without expensive
* over-fencing requires tradeoffs among the supported means of
* expressing access constraints. The most central operations,
* taking from queues and updating ctl state, require full-fence
* CAS. Array slots are read using the emulation of volatiles
* provided by Unsafe. Access from other threads to WorkQueue
* base, top, and array requires a volatile load of the first of
* any of these read. We use the convention of declaring the
* "base" index volatile, and always read it before other fields.
* The owner thread must ensure ordered updates, so writes use
* ordered intrinsics unless they can piggyback on those for other
* writes. Similar conventions and rationales hold for other
* WorkQueue fields (such as "currentSteal") that are only written
* by owners but observed by others.
*
* Creating workers. To create a worker, we pre-increment total
* count (serving as a reservation), and attempt to construct a
* ForkJoinWorkerThread via its factory. Upon construction, the
* new thread invokes registerWorker, where it constructs a
* WorkQueue and is assigned an index in the workQueues array
* (expanding the array if necessary). The thread is then
* started. Upon any exception across these steps, or null return
* from factory, deregisterWorker adjusts counts and records
* accordingly. If a null return, the pool continues running with
* fewer than the target number workers. If exceptional, the
* exception is propagated, generally to some external caller.
* Worker index assignment avoids the bias in scanning that would
* occur if entries were sequentially packed starting at the front
* of the workQueues array. We treat the array as a simple
* power-of-two hash table, expanding as needed. The seedIndex
* increment ensures no collisions until a resize is needed or a
* worker is deregistered and replaced, and thereafter keeps
* probability of collision low. We cannot use
* ThreadLocalRandom.getProbe() for similar purposes here because
* the thread has not started yet, but do so for creating
* submission queues for existing external threads.
*
* Deactivation and waiting. Queuing encounters several intrinsic
* races; most notably that a task-producing thread can miss
* seeing (and signalling) another thread that gave up looking for
* work but has not yet entered the wait queue. When a worker
* cannot find a task to steal, it deactivates and enqueues. Very
* often, the lack of tasks is transient due to GC or OS
* scheduling. To reduce false-alarm deactivation, scanners
* compute checksums of queue states during sweeps. (The
* stability checks used here and elsewhere are probabilistic
* variants of snapshot techniques -- see Herlihy & Shavit.)
* Workers give up and try to deactivate only after the sum is
* stable across scans. Further, to avoid missed signals, they
* repeat this scanning process after successful enqueuing until
* again stable. In this state, the worker cannot take/run a task
* it sees until it is released from the queue, so the worker
* itself eventually tries to release itself or any successor (see
* tryRelease). Otherwise, upon an empty scan, a deactivated
* worker uses an adaptive local spin construction (see awaitWork)
* before blocking (via park). Note the unusual conventions about
* Thread.interrupts surrounding parking and other blocking:
* Because interrupts are used solely to alert threads to check
* termination, which is checked anyway upon blocking, we clear
* status (using Thread.interrupted) before any call to park, so
* that park does not immediately return due to status being set
* via some other unrelated call to interrupt in user code.
*
* Signalling and activation. Workers are created or activated
* only when there appears to be at least one task they might be
* able to find and execute. Upon push (either by a worker or an
* external submission) to a previously (possibly) empty queue,
* workers are signalled if idle, or created if fewer exist than
* the given parallelism level. These primary signals are
* buttressed by others whenever other threads remove a task from
* a queue and notice that there are other tasks there as well.
* On most platforms, signalling (unpark) overhead time is
* noticeably long, and the time between signalling a thread and
* it actually making progress can be very noticeably long, so it
* is worth offloading these delays from critical paths as much as
* possible. Also, because inactive workers are often rescanning
* or spinning rather than blocking, we set and clear the "parker"
* field of WorkQueues to reduce unnecessary calls to unpark.
* (This requires a secondary recheck to avoid missed signals.)
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate (see awaitWork) if the pool has remained
* quiescent for period IDLE_TIMEOUT, increasing the period as the
* number of threads decreases, eventually removing all workers.
* Also, when more than two spare threads exist, excess threads
* are immediately terminated at the next quiescent point.
* (Padding by two avoids hysteresis.)
*
* Shutdown and Termination. A call to shutdownNow invokes
* tryTerminate to atomically set a runState bit. The calling
* thread, as well as every other worker thereafter terminating,
* helps terminate others by setting their (qlock) status,
* cancelling their unprocessed tasks, and waking them up, doing
* so repeatedly until stable (but with a loop bounded by the
* number of workers). Calls to non-abrupt shutdown() preface
* this by checking whether termination should commence. This
* relies primarily on the active count bits of "ctl" maintaining
* consensus -- tryTerminate is called from awaitWork whenever
* quiescent. However, external submitters do not take part in
* this consensus. So, tryTerminate sweeps through queues (until
* stable) to ensure lack of in-flight submissions and workers
* about to process them before triggering the "STOP" phase of
* termination. (Note: there is an intrinsic conflict if
* helpQuiescePool is called when shutdown is enabled. Both wait
* for quiescence, but tryTerminate is biased to not trigger until
* helpQuiescePool completes.)
*
*
* Joining Tasks
* =============
*
* Any of several actions may be taken when one worker is waiting
* to join a task stolen (or always held) by another. Because we
* are multiplexing many tasks on to a pool of workers, we can't
* just let them block (as in Thread.join). We also cannot just
* reassign the joiner's run-time stack with another and replace
* it later, which would be a form of "continuation", that even if
* possible is not necessarily a good idea since we may need both
* an unblocked task and its continuation to progress. Instead we
* combine two tactics:
*
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
*
* Compensating: Unless there are already enough live threads,
* method tryCompensate() may create or re-activate a spare
* thread to compensate for blocked joiners until they unblock.
*
* A third form (implemented in tryRemoveAndExec) amounts to
* helping a hypothetical compensator: If we can readily tell that
* a possible action of a compensator is to steal and execute the
* task being joined, the joining thread can do so directly,
* without the need for a compensation thread (although at the
* expense of larger run-time stacks, but the tradeoff is
* typically worthwhile).
*
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
*
* The algorithm in helpStealer entails a form of "linear
* helping". Each worker records (in field currentSteal) the most
* recent task it stole from some other worker (or a submission).
* It also records (in field currentJoin) the task it is currently
* actively joining. Method helpStealer uses these markers to try
* to find a worker to help (i.e., steal back a task from and
* execute it) that could hasten completion of the actively joined
* task. Thus, the joiner executes a task that would be on its
* own local deque had the to-be-joined task not been stolen. This
* is a conservative variant of the approach described in Wagner &
* Calder "Leapfrogging: a portable technique for implementing
* efficient futures" SIGPLAN Notices, 1993
* (http://portal.acm.org/citation.cfm?id=155354). It differs in
* that: (1) We only maintain dependency links across workers upon
* steals, rather than use per-task bookkeeping. This sometimes
* requires a linear scan of workQueues array to locate stealers,
* but often doesn't because stealers leave hints (that may become
* stale/wrong) of where to locate them. It is only a hint
* because a worker might have had multiple steals and the hint
* records only one of them (usually the most current). Hinting
* isolates cost to when it is needed, rather than adding to
* per-task overhead. (2) It is "shallow", ignoring nesting and
* potentially cyclic mutual steals. (3) It is intentionally
* racy: field currentJoin is updated only while actively joining,
* which means that we miss links in the chain during long-lived
* tasks, GC stalls etc (which is OK since blocking in such cases
* is usually a good idea). (4) We bound the number of attempts
* to find work using checksums and fall back to suspending the
* worker and if necessary replacing it with another.
*
* Helping actions for CountedCompleters do not require tracking
* currentJoins: Method helpComplete takes and executes any task
* with the same root as the task being waited on (preferring
* local pops to non-local polls). However, this still entails
* some traversal of completer chains, so is less efficient than
* using CountedCompleters without explicit joins.
*
* Compensation does not aim to keep exactly the target
* parallelism number of unblocked threads running at any given
* time. Some previous versions of this class employed immediate
* compensations for any blocked join. However, in practice, the
* vast majority of blockages are transient byproducts of GC and
* other JVM or OS activities that are made worse by replacement.
* Currently, compensation is attempted only after validating that
* all purportedly active threads are processing tasks by checking
* field WorkQueue.scanState, which eliminates most false
* positives. Also, compensation is bypassed (tolerating fewer
* threads) in the most common case in which it is rarely
* beneficial: when a worker with an empty queue (thus no
* continuation tasks) blocks on a join and there still remain
* enough threads to ensure liveness.
*
* The compensation mechanism may be bounded. Bounds for the
* commonPool (see commonMaxSpares) better enable JVMs to cope
* with programming errors and abuse before running out of
* resources to do so. In other cases, users may supply factories
* that limit thread construction. The effects of bounding in this
* pool (like all others) is imprecise. Total worker counts are
* decremented when threads deregister, not when they exit and
* resources are reclaimed by the JVM and OS. So the number of
* simultaneously live threads may transiently exceed bounds.
*
* Common Pool
* ===========
*
* The static common pool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
* allocation. Most bootstrapping occurs within method
* externalSubmit during the first submission to the pool.
*
* When external threads submit to the common pool, they can
* perform subtask processing (see externalHelpComplete and
* related methods) upon joins. This caller-helps policy makes it
* sensible to set common pool parallelism level to one (or more)
* less than the total number of available cores, or even zero for
* pure caller-runs. We do not need to record whether external
* submissions are to the common pool -- if not, external help
* methods return quickly. These submitters would otherwise be
* blocked waiting for completion, so the extra effort (with
* liberally sprinkled task status checks) in inapplicable cases
* amounts to an odd form of limited spin-wait before blocking in
* ForkJoinTask.join.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
* after executing any top-level task (see WorkQueue.runTask).
* The associated mechanics (mainly in ForkJoinWorkerThread) may
* be JVM-dependent and must access particular Thread class fields
* to achieve this effect.
*
* Style notes
* ===========
*
* Memory ordering relies mainly on Unsafe intrinsics that carry
* the further responsibility of explicitly performing null- and
* bounds- checks otherwise carried out implicitly by JVMs. This
* can be awkward and ugly, but also reflects the need to control
* outcomes across the unusual cases that arise in very racy code
* with very few invariants. So these explicit checks would exist
* in some form anyway. All fields are read into locals before
* use, and null-checked if they are references. This is usually
* done in a "C"-like style of listing declarations at the heads
* of methods or blocks, and using inline assignments on first
* encounter. Array bounds-checks are usually performed by
* masking with array.length-1, which relies on the invariant that
* these arrays are created with positive lengths, which is itself
* paranoically checked. Nearly all explicit checks lead to
* bypass/return, not exception throws, because they may
* legitimately arise due to cancellation/revocation during
* shutdown.
*
* There is a lot of representation-level coupling among classes
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
* fields of WorkQueue maintain data structures managed by
* ForkJoinPool, so are directly accessed. There is little point
* trying to reduce this, since any associated future changes in
* representations will need to be accompanied by algorithmic
* changes anyway. Several methods intrinsically sprawl because
* they must accumulate sets of consistent reads of fields held in
* local variables. There are also other coding oddities
* (including several unnecessary-looking hoisted null checks)
* that help some methods perform reasonably even when interpreted
* (not compiled).
*
* The order of declarations in this file is (with a few exceptions):
* (1) Static utility functions
* (2) Nested (static) classes
* (3) Static fields
* (4) Fields, along with constants used when unpacking some of them
* (5) Internal control methods
* (6) Callbacks and other support for ForkJoinTask methods
* (7) Exported methods
* (8) Static block initializing statics in minimally dependent order
*/
// Static utilities
/**
* If there is a security manager, makes sure caller has
* permission to modify threads.
*/
private static void
checkPermission() {
SecurityManager security =
System.
getSecurityManager();
if (
security != null)
security.
checkPermission(
modifyThreadPermission);
}
// Nested classes
/**
* Factory for creating new {@link ForkJoinWorkerThread}s.
* A {@code ForkJoinWorkerThreadFactory} must be defined and used
* for {@code ForkJoinWorkerThread} subclasses that extend base
* functionality or initialize threads with different contexts.
*/
public static interface
ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @return the new worker thread
* @throws NullPointerException if the pool is null
*/
public
ForkJoinWorkerThread newThread(
ForkJoinPool pool);
}
/**
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static final class
DefaultForkJoinWorkerThreadFactory
implements
ForkJoinWorkerThreadFactory {
public final
ForkJoinWorkerThread newThread(
ForkJoinPool pool) {
return new
ForkJoinWorkerThread(
pool);
}
}
/**
* Class for artificial tasks that are used to replace the target
* of local joins if they are removed from an interior queue slot
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to
* actually do anything beyond having a unique identity.
*/
static final class
EmptyTask extends
ForkJoinTask<
Void> {
private static final long
serialVersionUID = -7721805057305804111L;
EmptyTask() {
status =
ForkJoinTask.
NORMAL; } // force done
public final
Void getRawResult() { return null; }
public final void
setRawResult(
Void x) {}
public final boolean
exec() { return true; }
}
// Constants shared across ForkJoinPool and WorkQueue
// Bounds
static final int
SMASK = 0xffff; // short bits == max index
static final int
MAX_CAP = 0x7fff; // max #workers - 1
static final int
EVENMASK = 0xfffe; // even short bits
static final int
SQMASK = 0x007e; // max 64 (even) slots
// Masks and units for WorkQueue.scanState and ctl sp subfield
static final int
SCANNING = 1; // false when running tasks
static final int
INACTIVE = 1 << 31; // must be negative
static final int
SS_SEQ = 1 << 16; // version count
// Mode bits for ForkJoinPool.config and WorkQueue.config
static final int
MODE_MASK = 0xffff << 16; // top half of int
static final int
LIFO_QUEUE = 0;
static final int
FIFO_QUEUE = 1 << 16;
static final int
SHARED_QUEUE = 1 << 31; // must be negative
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. The @Contended annotation alerts
* JVMs to try to keep instances apart.
*/
@sun.misc.
Contended
static final class
WorkQueue {
/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two; at least 4, but should be larger to
* reduce or eliminate cacheline sharing among queues.
* Currently, it is much larger, as a partial workaround for
* the fact that JVMs often place arrays in locations that
* share GC bookkeeping (especially cardmarks) such that
* per-write accesses encounter serious memory contention.
*/
static final int
INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
static final int
MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int
scanState; // versioned, <0: inactive; odd:scanning
int
stackPred; // pool stack (ctl) predecessor
int
nsteals; // number of steals
int
hint; // randomization and stealer index hint
int
config; // pool index and mode
volatile int
qlock; // 1: locked, < 0: terminate; else 0
volatile int
base; // index of next slot for poll
int
top; // index of next slot for push
ForkJoinTask<?>[]
array; // the elements (initially unallocated)
final
ForkJoinPool pool; // the containing pool (may be null)
final
ForkJoinWorkerThread owner; // owning thread or null if shared
volatile
Thread parker; // == owner during call to park; else null
volatile
ForkJoinTask<?>
currentJoin; // task being joined in awaitJoin
volatile
ForkJoinTask<?>
currentSteal; // mainly used by helpStealer
WorkQueue(
ForkJoinPool pool,
ForkJoinWorkerThread owner) {
this.
pool =
pool;
this.
owner =
owner;
// Place indices in the center of array (that is not yet allocated)
base =
top =
INITIAL_QUEUE_CAPACITY >>> 1;
}
/**
* Returns an exportable index (used by ForkJoinWorkerThread).
*/
final int
getPoolIndex() {
return (
config & 0xffff) >>> 1; // ignore odd/even tag bit
}
/**
* Returns the approximate number of tasks in the queue.
*/
final int
queueSize() {
int
n =
base -
top; // non-owner callers must read base first
return (
n >= 0) ? 0 : -
n; // ignore transient negative
}
/**
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
*/
final boolean
isEmpty() {
ForkJoinTask<?>[]
a; int
n,
m,
s;
return ((
n =
base - (
s =
top)) >= 0 ||
(
n == -1 && // possibly one task
((
a =
array) == null || (
m =
a.length - 1) < 0 ||
U.
getObject
(
a, (long)((
m & (
s - 1)) <<
ASHIFT) +
ABASE) == null)));
}
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void
push(
ForkJoinTask<?>
task) {
ForkJoinTask<?>[]
a;
ForkJoinPool p;
int
b =
base,
s =
top,
n;
if ((
a =
array) != null) { // ignore if queue removed
int
m =
a.length - 1; // fenced write for task visibility
U.
putOrderedObject(
a, ((
m &
s) <<
ASHIFT) +
ABASE,
task);
U.
putOrderedInt(this,
QTOP,
s + 1);
if ((
n =
s -
b) <= 1) {
if ((
p =
pool) != null)
p.
signalWork(
p.
workQueues, this);
}
else if (
n >=
m)
growArray();
}
}
/**
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
*/
final
ForkJoinTask<?>[]
growArray() {
ForkJoinTask<?>[]
oldA =
array;
int
size =
oldA != null ?
oldA.length << 1 :
INITIAL_QUEUE_CAPACITY;
if (
size >
MAXIMUM_QUEUE_CAPACITY)
throw new
RejectedExecutionException("Queue capacity exceeded");
int
oldMask,
t,
b;
ForkJoinTask<?>[]
a =
array = new
ForkJoinTask<?>[
size];
if (
oldA != null && (
oldMask =
oldA.length - 1) >= 0 &&
(
t =
top) - (
b =
base) > 0) {
int
mask =
size - 1;
do { // emulate poll from old array, push to new array
ForkJoinTask<?>
x;
int
oldj = ((
b &
oldMask) <<
ASHIFT) +
ABASE;
int
j = ((
b &
mask) <<
ASHIFT) +
ABASE;
x = (
ForkJoinTask<?>)
U.
getObjectVolatile(
oldA,
oldj);
if (
x != null &&
U.
compareAndSwapObject(
oldA,
oldj,
x, null))
U.
putObjectVolatile(
a,
j,
x);
} while (++
b !=
t);
}
return
a;
}
/**
* Takes next task, if one exists, in LIFO order. Call only
* by owner in unshared queues.
*/
final
ForkJoinTask<?>
pop() {
ForkJoinTask<?>[]
a;
ForkJoinTask<?>
t; int
m;
if ((
a =
array) != null && (
m =
a.length - 1) >= 0) {
for (int
s; (
s =
top - 1) -
base >= 0;) {
long
j = ((
m &
s) <<
ASHIFT) +
ABASE;
if ((
t = (
ForkJoinTask<?>)
U.
getObject(
a,
j)) == null)
break;
if (
U.
compareAndSwapObject(
a,
j,
t, null)) {
U.
putOrderedInt(this,
QTOP,
s);
return
t;
}
}
}
return null;
}
/**
* Takes a task in FIFO order if b is base of queue and a task
* can be claimed without contention. Specialized versions
* appear in ForkJoinPool methods scan and helpStealer.
*/
final
ForkJoinTask<?>
pollAt(int
b) {
ForkJoinTask<?>
t;
ForkJoinTask<?>[]
a;
if ((
a =
array) != null) {
int
j = (((
a.length - 1) &
b) <<
ASHIFT) +
ABASE;
if ((
t = (
ForkJoinTask<?>)
U.
getObjectVolatile(
a,
j)) != null &&
base ==
b &&
U.
compareAndSwapObject(
a,
j,
t, null)) {
base =
b + 1;
return
t;
}
}
return null;
}
/**
* Takes next task, if one exists, in FIFO order.
*/
final
ForkJoinTask<?>
poll() {
ForkJoinTask<?>[]
a; int
b;
ForkJoinTask<?>
t;
while ((
b =
base) -
top < 0 && (
a =
array) != null) {
int
j = (((
a.length - 1) &
b) <<
ASHIFT) +
ABASE;
t = (
ForkJoinTask<?>)
U.
getObjectVolatile(
a,
j);
if (
base ==
b) {
if (
t != null) {
if (
U.
compareAndSwapObject(
a,
j,
t, null)) {
base =
b + 1;
return
t;
}
}
else if (
b + 1 ==
top) // now empty
break;
}
}
return null;
}
/**
* Takes next task, if one exists, in order specified by mode.
*/
final
ForkJoinTask<?>
nextLocalTask() {
return (
config &
FIFO_QUEUE) == 0 ?
pop() :
poll();
}
/**
* Returns next task, if one exists, in order specified by mode.
*/
final
ForkJoinTask<?>
peek() {
ForkJoinTask<?>[]
a =
array; int
m;
if (
a == null || (
m =
a.length - 1) < 0)
return null;
int
i = (
config &
FIFO_QUEUE) == 0 ?
top - 1 :
base;
int
j = ((
i &
m) <<
ASHIFT) +
ABASE;
return (
ForkJoinTask<?>)
U.
getObjectVolatile(
a,
j);
}
/**
* Pops the given task only if it is at the current top.
* (A shared version is available only via FJP.tryExternalUnpush)
*/
final boolean
tryUnpush(
ForkJoinTask<?>
t) {
ForkJoinTask<?>[]
a; int
s;
if ((
a =
array) != null && (
s =
top) !=
base &&
U.
compareAndSwapObject
(
a, (((
a.length - 1) & --
s) <<
ASHIFT) +
ABASE,
t, null)) {
U.
putOrderedInt(this,
QTOP,
s);
return true;
}
return false;
}
/**
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void
cancelAll() {
ForkJoinTask<?>
t;
if ((
t =
currentJoin) != null) {
currentJoin = null;
ForkJoinTask.
cancelIgnoringExceptions(
t);
}
if ((
t =
currentSteal) != null) {
currentSteal = null;
ForkJoinTask.
cancelIgnoringExceptions(
t);
}
while ((
t =
poll()) != null)
ForkJoinTask.
cancelIgnoringExceptions(
t);
}
// Specialized execution methods
/**
* Polls and runs tasks until empty.
*/
final void
pollAndExecAll() {
for (
ForkJoinTask<?>
t; (
t =
poll()) != null;)
t.
doExec();
}
/**
* Removes and executes all local tasks. If LIFO, invokes
* pollAndExecAll. Otherwise implements a specialized pop loop
* to exec until empty.
*/
final void
execLocalTasks() {
int
b =
base,
m,
s;
ForkJoinTask<?>[]
a =
array;
if (
b - (
s =
top - 1) <= 0 &&
a != null &&
(
m =
a.length - 1) >= 0) {
if ((
config &
FIFO_QUEUE) == 0) {
for (
ForkJoinTask<?>
t;;) {
if ((
t = (
ForkJoinTask<?>)
U.
getAndSetObject
(
a, ((
m &
s) <<
ASHIFT) +
ABASE, null)) == null)
break;
U.
putOrderedInt(this,
QTOP,
s);
t.
doExec();
if (
base - (
s =
top - 1) > 0)
break;
}
}
else
pollAndExecAll();
}
}
/**
* Executes the given task and any remaining local tasks.
*/
final void
runTask(
ForkJoinTask<?>
task) {
if (
task != null) {
scanState &= ~
SCANNING; // mark as busy
(
currentSteal =
task).
doExec();
U.
putOrderedObject(this,
QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread =
owner;
if (++
nsteals < 0) // collect on overflow
transferStealCount(
pool);
scanState |=
SCANNING;
if (
thread != null)
thread.
afterTopLevelExec();
}
}
/**
* Adds steal count to pool stealCounter if it exists, and resets.
*/
final void
transferStealCount(
ForkJoinPool p) {
AtomicLong sc;
if (
p != null && (
sc =
p.
stealCounter) != null) {
int
s =
nsteals;
nsteals = 0; // if negative, correct for overflow
sc.
getAndAdd((long)(
s < 0 ?
Integer.
MAX_VALUE :
s));
}
}
/**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Used only by awaitJoin.
*
* @return true if queue empty and task not known to be done
*/
final boolean
tryRemoveAndExec(
ForkJoinTask<?>
task) {
ForkJoinTask<?>[]
a; int
m,
s,
b,
n;
if ((
a =
array) != null && (
m =
a.length - 1) >= 0 &&
task != null) {
while ((
n = (
s =
top) - (
b =
base)) > 0) {
for (
ForkJoinTask<?>
t;;) { // traverse from s to b
long
j = ((--
s &
m) <<
ASHIFT) +
ABASE;
if ((
t = (
ForkJoinTask<?>)
U.
getObject(
a,
j)) == null)
return
s + 1 ==
top; // shorter than expected
else if (
t ==
task) {
boolean
removed = false;
if (
s + 1 ==
top) { // pop
if (
U.
compareAndSwapObject(
a,
j,
task, null)) {
U.
putOrderedInt(this,
QTOP,
s);
removed = true;
}
}
else if (
base ==
b) // replace with proxy
removed =
U.
compareAndSwapObject(
a,
j,
task, new
EmptyTask());
if (
removed)
task.
doExec();
break;
}
else if (
t.
status < 0 &&
s + 1 ==
top) {
if (
U.
compareAndSwapObject(
a,
j,
t, null))
U.
putOrderedInt(this,
QTOP,
s);
break; // was cancelled
}
if (--
n == 0)
return false;
}
if (
task.
status < 0)
return false;
}
}
return true;
}
/**
* Pops task if in the same CC computation as the given task,
* in either shared or owned mode. Used only by helpComplete.
*/
final
CountedCompleter<?>
popCC(
CountedCompleter<?>
task, int
mode) {
int
s;
ForkJoinTask<?>[]
a;
Object o;
if (
base - (
s =
top) < 0 && (
a =
array) != null) {
long
j = (((
a.length - 1) & (
s - 1)) <<
ASHIFT) +
ABASE;
if ((
o =
U.
getObjectVolatile(
a,
j)) != null &&
(
o instanceof
CountedCompleter)) {
CountedCompleter<?>
t = (
CountedCompleter<?>)
o;
for (
CountedCompleter<?>
r =
t;;) {
if (
r ==
task) {
if (
mode < 0) { // must lock
if (
U.
compareAndSwapInt(this,
QLOCK, 0, 1)) {
if (
top ==
s &&
array ==
a &&
U.
compareAndSwapObject(
a,
j,
t, null)) {
U.
putOrderedInt(this,
QTOP,
s - 1);
U.
putOrderedInt(this,
QLOCK, 0);
return
t;
}
U.
compareAndSwapInt(this,
QLOCK, 1, 0);
}
}
else if (
U.
compareAndSwapObject(
a,
j,
t, null)) {
U.
putOrderedInt(this,
QTOP,
s - 1);
return
t;
}
break;
}
else if ((
r =
r.
completer) == null) // try parent
break;
}
}
}
return null;
}
/**
* Steals and runs a task in the same CC computation as the
* given task if one exists and can be taken without
* contention. Otherwise returns a checksum/control value for
* use by method helpComplete.
*
* @return 1 if successful, 2 if retryable (lost to another
* stealer), -1 if non-empty but no matching task found, else
* the base index, forced negative.
*/
final int
pollAndExecCC(
CountedCompleter<?>
task) {
int
b,
h;
ForkJoinTask<?>[]
a;
Object o;
if ((
b =
base) -
top >= 0 || (
a =
array) == null)
h =
b |
Integer.
MIN_VALUE; // to sense movement on re-poll
else {
long
j = (((
a.length - 1) &
b) <<
ASHIFT) +
ABASE;
if ((
o =
U.
getObjectVolatile(
a,
j)) == null)
h = 2; // retryable
else if (!(
o instanceof
CountedCompleter))
h = -1; // unmatchable
else {
CountedCompleter<?>
t = (
CountedCompleter<?>)
o;
for (
CountedCompleter<?>
r =
t;;) {
if (
r ==
task) {
if (
base ==
b &&
U.
compareAndSwapObject(
a,
j,
t, null)) {
base =
b + 1;
t.
doExec();
h = 1; // success
}
else
h = 2; // lost CAS
break;
}
else if ((
r =
r.
completer) == null) {
h = -1; // unmatched
break;
}
}
}
}
return
h;
}
/**
* Returns true if owned and not known to be blocked.
*/
final boolean
isApparentlyUnblocked() {
Thread wt;
Thread.
State s;
return (
scanState >= 0 &&
(
wt =
owner) != null &&
(
s =
wt.
getState()) !=
Thread.
State.
BLOCKED &&
s !=
Thread.
State.
WAITING &&
s !=
Thread.
State.
TIMED_WAITING);
}
// Unsafe mechanics. Note that some are (and must be) the same as in FJP
private static final sun.misc.
Unsafe U;
private static final int
ABASE;
private static final int
ASHIFT;
private static final long
QTOP;
private static final long
QLOCK;
private static final long
QCURRENTSTEAL;
static {
try {
U = sun.misc.
Unsafe.
getUnsafe();
Class<?>
wk =
WorkQueue.class;
Class<?>
ak =
ForkJoinTask[].class;
QTOP =
U.
objectFieldOffset
(
wk.
getDeclaredField("top"));
QLOCK =
U.
objectFieldOffset
(
wk.
getDeclaredField("qlock"));
QCURRENTSTEAL =
U.
objectFieldOffset
(
wk.
getDeclaredField("currentSteal"));
ABASE =
U.
arrayBaseOffset(
ak);
int
scale =
U.
arrayIndexScale(
ak);
if ((
scale & (
scale - 1)) != 0)
throw new
Error("data type scale not a power of two");
ASHIFT = 31 -
Integer.
numberOfLeadingZeros(
scale);
} catch (
Exception e) {
throw new
Error(
e);
}
}
}
// static fields (initialized in static initializer below)
/**
* Creates a new ForkJoinWorkerThread. This factory is used unless
* overridden in ForkJoinPool constructors.
*/
public static final
ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
/**
* Permission required for callers of methods that may start or
* kill threads.
*/
private static final
RuntimePermission modifyThreadPermission;
/**
* Common (static) pool. Non-null for public use unless a static
* construction exception, but internal usages null-check on use
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
static final
ForkJoinPool common;
/**
* Common pool parallelism. To allow simpler use and management
* when common pool threads are disabled, we allow the underlying
* common.parallelism field to be zero, but in that case still report
* parallelism as 1 to reflect resulting caller-runs mechanics.
*/
static final int
commonParallelism;
/**
* Limit on spare thread construction in tryCompensate.
*/
private static int
commonMaxSpares;
/**
* Sequence number for creating workerNamePrefix.
*/
private static int
poolNumberSequence;
/**
* Returns the next sequence number. We don't expect this to
* ever contend, so use simple builtin sync.
*/
private static final synchronized int
nextPoolId() {
return ++
poolNumberSequence;
}
// static configuration constants
/**
* Initial timeout value (in nanoseconds) for the thread
* triggering quiescence to park waiting for new work. On timeout,
* the thread will instead try to shrink the number of
* workers. The value should be large enough to avoid overly
* aggressive shrinkage during most transient stalls (long GCs
* etc).
*/
private static final long
IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
/**
* Tolerance for idle timeouts, to cope with timer undershoots
*/
private static final long
TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
/**
* The initial value for commonMaxSpares during static
* initialization. The value is far in excess of normal
* requirements, but also far short of MAX_CAP and typical
* OS thread limits, so allows JVMs to catch misuse/abuse
* before running out of resources needed to do so.
*/
private static final int
DEFAULT_COMMON_MAX_SPARES = 256;
/**
* Number of times to spin-wait before blocking. The spins (in
* awaitRunStateLock and awaitWork) currently use randomized
* spins. Currently set to zero to reduce CPU usage.
*
* If greater than zero the value of SPINS must be a power
* of two, at least 4. A value of 2048 causes spinning for a
* small fraction of typical context-switch times.
*
* If/when MWAIT-like intrinsics becomes available, they
* may allow quieter spinning.
*/
private static final int
SPINS = 0;
/**
* Increment for seed generators. See class ThreadLocal for
* explanation.
*/
private static final int
SEED_INCREMENT = 0x9e3779b9;
/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
*
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl. The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers. When sp is non-zero, there are waiting workers. To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join. Other updates entail multiple subfields
* and masking, requiring CAS.
*/
// Lower and upper word masks
private static final long
SP_MASK = 0xffffffffL;
private static final long
UC_MASK = ~
SP_MASK;
// Active counts
private static final int
AC_SHIFT = 48;
private static final long
AC_UNIT = 0x0001L <<
AC_SHIFT;
private static final long
AC_MASK = 0xffffL <<
AC_SHIFT;
// Total counts
private static final int
TC_SHIFT = 32;
private static final long
TC_UNIT = 0x0001L <<
TC_SHIFT;
private static final long
TC_MASK = 0xffffL <<
TC_SHIFT;
private static final long
ADD_WORKER = 0x0001L << (
TC_SHIFT + 15); // sign
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int
RSLOCK = 1;
private static final int
RSIGNAL = 1 << 1;
private static final int
STARTED = 1 << 2;
private static final int
STOP = 1 << 29;
private static final int
TERMINATED = 1 << 30;
private static final int
SHUTDOWN = 1 << 31;
// Instance fields
volatile long
ctl; // main pool control
volatile int
runState; // lockable status
final int
config; // parallelism, mode
int
indexSeed; // to generate worker index
volatile
WorkQueue[]
workQueues; // main registry
final
ForkJoinWorkerThreadFactory factory;
final
UncaughtExceptionHandler ueh; // per-worker UEH
final
String workerNamePrefix; // to create worker name string
volatile
AtomicLong stealCounter; // also used as sync monitor
/**
* Acquires the runState lock; returns current (locked) runState.
*/
private int
lockRunState() {
int
rs;
return ((((
rs =
runState) &
RSLOCK) != 0 ||
!
U.
compareAndSwapInt(this,
RUNSTATE,
rs,
rs |=
RSLOCK)) ?
awaitRunStateLock() :
rs);
}
/**
* Spins and/or blocks until runstate lock is available. See
* above for explanation.
*/
private int
awaitRunStateLock() {
Object lock;
boolean
wasInterrupted = false;
for (int
spins =
SPINS,
r = 0,
rs,
ns;;) {
if (((
rs =
runState) &
RSLOCK) == 0) {
if (
U.
compareAndSwapInt(this,
RUNSTATE,
rs,
ns =
rs |
RSLOCK)) {
if (
wasInterrupted) {
try {
Thread.
currentThread().
interrupt();
} catch (
SecurityException ignore) {
}
}
return
ns;
}
}
else if (
r == 0)
r =
ThreadLocalRandom.
nextSecondarySeed();
else if (
spins > 0) {
r ^=
r << 6;
r ^=
r >>> 21;
r ^=
r << 7; // xorshift
if (
r >= 0)
--
spins;
}
else if ((
rs &
STARTED) == 0 || (
lock =
stealCounter) == null)
Thread.
yield(); // initialization race
else if (
U.
compareAndSwapInt(this,
RUNSTATE,
rs,
rs |
RSIGNAL)) {
synchronized (
lock) {
if ((
runState &
RSIGNAL) != 0) {
try {
lock.
wait();
} catch (
InterruptedException ie) {
if (!(
Thread.
currentThread() instanceof
ForkJoinWorkerThread))
wasInterrupted = true;
}
}
else
lock.
notifyAll();
}
}
}
}
/**
* Unlocks and sets runState to newRunState.
*
* @param oldRunState a value returned from lockRunState
* @param newRunState the next value (must have lock bit clear).
*/
private void
unlockRunState(int
oldRunState, int
newRunState) {
if (!
U.
compareAndSwapInt(this,
RUNSTATE,
oldRunState,
newRunState)) {
Object lock =
stealCounter;
runState =
newRunState; // clears RSIGNAL bit
if (
lock != null)
synchronized (
lock) {
lock.
notifyAll(); }
}
}
// Creating, registering and deregistering workers
/**
* Tries to construct and start one worker. Assumes that total
* count has already been incremented as a reservation. Invokes
* deregisterWorker on any failure.
*
* @return true if successful
*/
private boolean
createWorker() {
ForkJoinWorkerThreadFactory fac =
factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (
fac != null && (
wt =
fac.
newThread(this)) != null) {
wt.
start();
return true;
}
} catch (
Throwable rex) {
ex =
rex;
}
deregisterWorker(
wt,
ex);
return false;
}
/**
* Tries to add one worker, incrementing ctl counts before doing
* so, relying on createWorker to back out on failure.
*
* @param c incoming ctl value, with total count negative and no
* idle workers. On CAS failure, c is refreshed and retried if
* this holds (otherwise, a new worker is not needed).
*/
private void
tryAddWorker(long
c) {
boolean
add = false;
do {
long
nc = ((
AC_MASK & (
c +
AC_UNIT)) |
(
TC_MASK & (
c +
TC_UNIT)));
if (
ctl ==
c) {
int
rs,
stop; // check if terminating
if ((
stop = (
rs =
lockRunState()) &
STOP) == 0)
add =
U.
compareAndSwapLong(this,
CTL,
c,
nc);
unlockRunState(
rs,
rs & ~
RSLOCK);
if (
stop != 0)
break;
if (
add) {
createWorker();
break;
}
}
} while (((
c =
ctl) &
ADD_WORKER) != 0L && (int)
c == 0);
}
/**
* Callback from ForkJoinWorkerThread constructor to establish and
* record its WorkQueue.
*
* @param wt the worker thread
* @return the worker's queue
*/
final
WorkQueue registerWorker(
ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.
setDaemon(true); // configure thread
if ((
handler =
ueh) != null)
wt.
setUncaughtExceptionHandler(
handler);
WorkQueue w = new
WorkQueue(this,
wt);
int
i = 0; // assign a pool index
int
mode =
config &
MODE_MASK;
int
rs =
lockRunState();
try {
WorkQueue[]
ws; int
n; // skip if no array
if ((
ws =
workQueues) != null && (
n =
ws.length) > 0) {
int
s =
indexSeed +=
SEED_INCREMENT; // unlikely to collide
int
m =
n - 1;
i = ((
s << 1) | 1) &
m; // odd-numbered indices
if (
ws[
i] != null) { // collision
int
probes = 0; // step by approx half n
int
step = (
n <= 4) ? 2 : ((
n >>> 1) &
EVENMASK) + 2;
while (
ws[
i = (
i +
step) &
m] != null) {
if (++
probes >=
n) {
workQueues =
ws =
Arrays.
copyOf(
ws,
n <<= 1);
m =
n - 1;
probes = 0;
}
}
}
w.
hint =
s; // use as random seed
w.
config =
i |
mode;
w.
scanState =
i; // publication fence
ws[
i] =
w;
}
} finally {
unlockRunState(
rs,
rs & ~
RSLOCK);
}
wt.
setName(
workerNamePrefix.
concat(
Integer.
toString(
i >>> 1)));
return
w;
}
/**
* Final callback from terminating worker, as well as upon failure
* to construct or start a worker. Removes record of worker from
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
* @param wt the worker thread, or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void
deregisterWorker(
ForkJoinWorkerThread wt,
Throwable ex) {
WorkQueue w = null;
if (
wt != null && (
w =
wt.
workQueue) != null) {
WorkQueue[]
ws; // remove index from array
int
idx =
w.
config &
SMASK;
int
rs =
lockRunState();
if ((
ws =
workQueues) != null &&
ws.length >
idx &&
ws[
idx] ==
w)
ws[
idx] = null;
unlockRunState(
rs,
rs & ~
RSLOCK);
}
long
c; // decrement counts
do {} while (!
U.
compareAndSwapLong
(this,
CTL,
c =
ctl, ((
AC_MASK & (
c -
AC_UNIT)) |
(
TC_MASK & (
c -
TC_UNIT)) |
(
SP_MASK &
c))));
if (
w != null) {
w.
qlock = -1; // ensure set
w.
transferStealCount(this);
w.
cancelAll(); // cancel remaining tasks
}
for (;;) { // possibly replace
WorkQueue[]
ws; int
m,
sp;
if (
tryTerminate(false, false) ||
w == null ||
w.
array == null ||
(
runState &
STOP) != 0 || (
ws =
workQueues) == null ||
(
m =
ws.length - 1) < 0) // already terminating
break;
if ((
sp = (int)(
c =
ctl)) != 0) { // wake up replacement
if (
tryRelease(
c,
ws[
sp &
m],
AC_UNIT))
break;
}
else if (
ex != null && (
c &
ADD_WORKER) != 0L) {
tryAddWorker(
c); // create replacement
break;
}
else // don't need replacement
break;
}
if (
ex == null) // help clean on way out
ForkJoinTask.
helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.
rethrow(
ex);
}
// Signalling
/**
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
* @param q a WorkQueue --if non-null, don't retry if now empty
*/
final void
signalWork(
WorkQueue[]
ws,
WorkQueue q) {
long
c; int
sp,
i;
WorkQueue v;
Thread p;
while ((
c =
ctl) < 0L) { // too few active
if ((
sp = (int)
c) == 0) { // no idle workers
if ((
c &
ADD_WORKER) != 0L) // too few workers
tryAddWorker(
c);
break;
}
if (
ws == null) // unstarted/terminated
break;
if (
ws.length <= (
i =
sp &
SMASK)) // terminated
break;
if ((
v =
ws[
i]) == null) // terminating
break;
int
vs = (
sp +
SS_SEQ) & ~
INACTIVE; // next scanState
int
d =
sp -
v.
scanState; // screen CAS
long
nc = (
UC_MASK & (
c +
AC_UNIT)) | (
SP_MASK &
v.
stackPred);
if (
d == 0 &&
U.
compareAndSwapLong(this,
CTL,
c,
nc)) {
v.
scanState =
vs; // activate v
if ((
p =
v.
parker) != null)
U.
unpark(
p);
break;
}
if (
q != null &&
q.
base ==
q.
top) // no more work
break;
}
}
/**
* Signals and releases worker v if it is top of idle worker
* stack. This performs a one-shot version of signalWork only if
* there is (apparently) at least one idle worker.
*
* @param c incoming ctl value
* @param v if non-null, a worker
* @param inc the increment to active count (zero when compensating)
* @return true if successful
*/
private boolean
tryRelease(long
c,
WorkQueue v, long
inc) {
int
sp = (int)
c,
vs = (
sp +
SS_SEQ) & ~
INACTIVE;
Thread p;
if (
v != null &&
v.
scanState ==
sp) { // v is at top of stack
long
nc = (
UC_MASK & (
c +
inc)) | (
SP_MASK &
v.
stackPred);
if (
U.
compareAndSwapLong(this,
CTL,
c,
nc)) {
v.
scanState =
vs;
if ((
p =
v.
parker) != null)
U.
unpark(
p);
return true;
}
}
return false;
}
// Scanning for tasks
/**
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void
runWorker(
WorkQueue w) {
w.
growArray(); // allocate queue
int
seed =
w.
hint; // initially holds randomization hint
int
r = (
seed == 0) ? 1 :
seed; // avoid 0 for xorShift
for (
ForkJoinTask<?>
t;;) {
if ((
t =
scan(
w,
r)) != null)
w.
runTask(
t);
else if (!
awaitWork(
w,
r))
break;
r ^=
r << 13;
r ^=
r >>> 17;
r ^=
r << 5; // xorshift
}
}
/**
* Scans for and tries to steal a top-level task. Scans start at a
* random location, randomly moving on apparent contention,
* otherwise continuing linearly until reaching two consecutive
* empty passes over all queues with the same checksum (summing
* each base index of each queue, that moves on each steal), at
* which point the worker tries to inactivate and then re-scans,
* attempting to re-activate (itself or some other worker) if
* finding a task; otherwise returning null to await work. Scans
* otherwise touch as little memory as possible, to reduce
* disruption on other scanning threads.
*
* @param w the worker (via its WorkQueue)
* @param r a random seed
* @return a task, or null if none found
*/
private
ForkJoinTask<?>
scan(
WorkQueue w, int
r) {
WorkQueue[]
ws; int
m;
if ((
ws =
workQueues) != null && (
m =
ws.length - 1) > 0 &&
w != null) {
int
ss =
w.
scanState; // initially non-negative
for (int
origin =
r &
m,
k =
origin,
oldSum = 0,
checkSum = 0;;) {
WorkQueue q;
ForkJoinTask<?>[]
a;
ForkJoinTask<?>
t;
int
b,
n; long
c;
if ((
q =
ws[
k]) != null) {
if ((
n = (
b =
q.
base) -
q.
top) < 0 &&
(
a =
q.
array) != null) { // non-empty
long
i = (((
a.length - 1) &
b) <<
ASHIFT) +
ABASE;
if ((
t = ((
ForkJoinTask<?>)
U.
getObjectVolatile(
a,
i))) != null &&
q.
base ==
b) {
if (
ss >= 0) {
if (
U.
compareAndSwapObject(
a,
i,
t, null)) {
q.
base =
b + 1;
if (
n < -1) // signal others
signalWork(
ws,
q);
return
t;
}
}
else if (
oldSum == 0 && // try to activate
w.
scanState < 0)
tryRelease(
c =
ctl,
ws[
m & (int)
c],
AC_UNIT);
}
if (
ss < 0) // refresh
ss =
w.
scanState;
r ^=
r << 1;
r ^=
r >>> 3;
r ^=
r << 10;
origin =
k =
r &
m; // move and rescan
oldSum =
checkSum = 0;
continue;
}
checkSum +=
b;
}
if ((
k = (
k + 1) &
m) ==
origin) { // continue until stable
if ((
ss >= 0 || (
ss == (
ss =
w.
scanState))) &&
oldSum == (
oldSum =
checkSum)) {
if (
ss < 0 ||
w.
qlock < 0) // already inactive
break;
int
ns =
ss |
INACTIVE; // try to inactivate
long
nc = ((
SP_MASK &
ns) |
(
UC_MASK & ((
c =
ctl) -
AC_UNIT)));
w.
stackPred = (int)
c; // hold prev stack top
U.
putInt(
w,
QSCANSTATE,
ns);
if (
U.
compareAndSwapLong(this,
CTL,
c,
nc))
ss =
ns;
else
w.
scanState =
ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
/**
* Possibly blocks worker w waiting for a task to steal, or
* returns false if the worker should terminate. If inactivating
* w has caused the pool to become quiescent, checks for pool
* termination, and, so long as this is not the only worker, waits
* for up to a given duration. On timeout, if ctl has not
* changed, terminates the worker, which will in turn wake up
* another worker to possibly repeat this process.
*
* @param w the calling worker
* @param r a random seed (for spins)
* @return false if the worker should terminate
*/
private boolean
awaitWork(
WorkQueue w, int
r) {
if (
w == null ||
w.
qlock < 0) // w is terminating
return false;
for (int
pred =
w.
stackPred,
spins =
SPINS,
ss;;) {
if ((
ss =
w.
scanState) >= 0)
break;
else if (
spins > 0) {
r ^=
r << 6;
r ^=
r >>> 21;
r ^=
r << 7;
if (
r >= 0 && --
spins == 0) { // randomize spins
WorkQueue v;
WorkQueue[]
ws; int
s,
j;
AtomicLong sc;
if (
pred != 0 && (
ws =
workQueues) != null &&
(
j =
pred &
SMASK) <
ws.length &&
(
v =
ws[
j]) != null && // see if pred parking
(
v.
parker == null ||
v.
scanState >= 0))
spins =
SPINS; // continue spinning
}
}
else if (
w.
qlock < 0) // recheck after spins
return false;
else if (!
Thread.
interrupted()) {
long
c,
prevctl,
parkTime,
deadline;
int
ac = (int)((
c =
ctl) >>
AC_SHIFT) + (
config &
SMASK);
if ((
ac <= 0 &&
tryTerminate(false, false)) ||
(
runState &
STOP) != 0) // pool terminating
return false;
if (
ac <= 0 &&
ss == (int)
c) { // is last waiter
prevctl = (
UC_MASK & (
c +
AC_UNIT)) | (
SP_MASK &
pred);
int
t = (short)(
c >>>
TC_SHIFT); // shrink excess spares
if (
t > 2 &&
U.
compareAndSwapLong(this,
CTL,
c,
prevctl))
return false; // else use timed wait
parkTime =
IDLE_TIMEOUT * ((
t >= 0) ? 1 : 1 -
t);
deadline =
System.
nanoTime() +
parkTime -
TIMEOUT_SLOP;
}
else
prevctl =
parkTime =
deadline = 0L;
Thread wt =
Thread.
currentThread();
U.
putObject(
wt,
PARKBLOCKER, this); // emulate LockSupport
w.
parker =
wt;
if (
w.
scanState < 0 &&
ctl ==
c) // recheck before park
U.
park(false,
parkTime);
U.
putOrderedObject(
w,
QPARKER, null);
U.
putObject(
wt,
PARKBLOCKER, null);
if (
w.
scanState >= 0)
break;
if (
parkTime != 0L &&
ctl ==
c &&
deadline -
System.
nanoTime() <= 0L &&
U.
compareAndSwapLong(this,
CTL,
c,
prevctl))
return false; // shrink pool
}
}
return true;
}
// Joining tasks
/**
* Tries to steal and run tasks within the target's computation.
* Uses a variant of the top-level algorithm, restricted to tasks
* with the given task as ancestor: It prefers taking and running
* eligible tasks popped from the worker's own queue (via
* popCC). Otherwise it scans others, randomly moving on
* contention or execution, deciding to give up based on a
* checksum (via return codes frob pollAndExecCC). The maxTasks
* argument supports external usages; internal calls use zero,
* allowing unbounded steps (external calls trap non-positive
* values).
*
* @param w caller
* @param maxTasks if non-zero, the maximum number of other tasks to run
* @return task status on exit
*/
final int
helpComplete(
WorkQueue w,
CountedCompleter<?>
task,
int
maxTasks) {
WorkQueue[]
ws; int
s = 0,
m;
if ((
ws =
workQueues) != null && (
m =
ws.length - 1) >= 0 &&
task != null &&
w != null) {
int
mode =
w.
config; // for popCC
int
r =
w.
hint ^
w.
top; // arbitrary seed for origin
int
origin =
r &
m; // first queue to scan
int
h = 1; // 1:ran, >1:contended, <0:hash
for (int
k =
origin,
oldSum = 0,
checkSum = 0;;) {
CountedCompleter<?>
p;
WorkQueue q;
if ((
s =
task.
status) < 0)
break;
if (
h == 1 && (
p =
w.
popCC(
task,
mode)) != null) {
p.
doExec(); // run local task
if (
maxTasks != 0 && --
maxTasks == 0)
break;
origin =
k; // reset
oldSum =
checkSum = 0;
}
else { // poll other queues
if ((
q =
ws[
k]) == null)
h = 0;
else if ((
h =
q.
pollAndExecCC(
task)) < 0)
checkSum +=
h;
if (
h > 0) {
if (
h == 1 &&
maxTasks != 0 && --
maxTasks == 0)
break;
r ^=
r << 13;
r ^=
r >>> 17;
r ^=
r << 5; // xorshift
origin =
k =
r &
m; // move and restart
oldSum =
checkSum = 0;
}
else if ((
k = (
k + 1) &
m) ==
origin) {
if (
oldSum == (
oldSum =
checkSum))
break;
checkSum = 0;
}
}
}
}
return
s;
}
/**
* Tries to locate and execute tasks for a stealer of the given
* task, or in turn one of its stealers, Traces currentSteal ->
* currentJoin links looking for a thread working on a descendant
* of the given task and with a non-empty queue to steal back and
* execute tasks from. The first call to this method upon a
* waiting join will often entail scanning/search, (which is OK
* because the joiner has nothing better to do), but this method
* leaves hints in workers to speed up subsequent calls.
*
* @param w caller
* @param task the task to join
*/
private void
helpStealer(
WorkQueue w,
ForkJoinTask<?>
task) {
WorkQueue[]
ws =
workQueues;
int
oldSum = 0,
checkSum,
m;
if (
ws != null && (
m =
ws.length - 1) >= 0 &&
w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?>
subtask;
WorkQueue j =
w,
v; // v is subtask stealer
descent: for (
subtask =
task;
subtask.
status >= 0; ) {
for (int
h =
j.
hint | 1,
k = 0,
i; ;
k += 2) {
if (
k >
m) // can't find stealer
break
descent;
if ((
v =
ws[
i = (
h +
k) &
m]) != null) {
if (
v.
currentSteal ==
subtask) {
j.
hint =
i;
break;
}
checkSum +=
v.
base;
}
}
for (;;) { // help v or descend
ForkJoinTask<?>[]
a; int
b;
checkSum += (
b =
v.
base);
ForkJoinTask<?>
next =
v.
currentJoin;
if (
subtask.
status < 0 ||
j.
currentJoin !=
subtask ||
v.
currentSteal !=
subtask) // stale
break
descent;
if (
b -
v.
top >= 0 || (
a =
v.
array) == null) {
if ((
subtask =
next) == null)
break
descent;
j =
v;
break;
}
int
i = (((
a.length - 1) &
b) <<
ASHIFT) +
ABASE;
ForkJoinTask<?>
t = ((
ForkJoinTask<?>)
U.
getObjectVolatile(
a,
i));
if (
v.
base ==
b) {
if (
t == null) // stale
break
descent;
if (
U.
compareAndSwapObject(
a,
i,
t, null)) {
v.
base =
b + 1;
ForkJoinTask<?>
ps =
w.
currentSteal;
int
top =
w.
top;
do {
U.
putOrderedObject(
w,
QCURRENTSTEAL,
t);
t.
doExec(); // clear local tasks too
} while (
task.
status >= 0 &&
w.
top !=
top &&
(
t =
w.
pop()) != null);
U.
putOrderedObject(
w,
QCURRENTSTEAL,
ps);
if (
w.
base !=
w.
top)
return; // can't further help
}
}
}
}
} while (
task.
status >= 0 &&
oldSum != (
oldSum =
checkSum));
}
}
/**
* Tries to decrement active count (sometimes implicitly) and
* possibly release or create a compensating worker in preparation
* for blocking. Returns false (retryable by caller), on
* contention, detected staleness, instability, or termination.
*
* @param w caller
*/
private boolean
tryCompensate(
WorkQueue w) {
boolean
canBlock;
WorkQueue[]
ws; long
c; int
m,
pc,
sp;
if (
w == null ||
w.
qlock < 0 || // caller terminating
(
ws =
workQueues) == null || (
m =
ws.length - 1) <= 0 ||
(
pc =
config &
SMASK) == 0) // parallelism disabled
canBlock = false;
else if ((
sp = (int)(
c =
ctl)) != 0) // release idle worker
canBlock =
tryRelease(
c,
ws[
sp &
m], 0L);
else {
int
ac = (int)(
c >>
AC_SHIFT) +
pc;
int
tc = (short)(
c >>
TC_SHIFT) +
pc;
int
nbusy = 0; // validate saturation
for (int
i = 0;
i <=
m; ++
i) { // two passes of odd indices
WorkQueue v;
if ((
v =
ws[((
i << 1) | 1) &
m]) != null) {
if ((
v.
scanState &
SCANNING) != 0)
break;
++
nbusy;
}
}
if (
nbusy != (
tc << 1) ||
ctl !=
c)
canBlock = false; // unstable or stale
else if (
tc >=
pc &&
ac > 1 &&
w.
isEmpty()) {
long
nc = ((
AC_MASK & (
c -
AC_UNIT)) |
(~
AC_MASK &
c)); // uncompensated
canBlock =
U.
compareAndSwapLong(this,
CTL,
c,
nc);
}
else if (
tc >=
MAX_CAP ||
(this ==
common &&
tc >=
pc +
commonMaxSpares))
throw new
RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
else { // similar to tryAddWorker
boolean
add = false; int
rs; // CAS within lock
long
nc = ((
AC_MASK &
c) |
(
TC_MASK & (
c +
TC_UNIT)));
if (((
rs =
lockRunState()) &
STOP) == 0)
add =
U.
compareAndSwapLong(this,
CTL,
c,
nc);
unlockRunState(
rs,
rs & ~
RSLOCK);
canBlock =
add &&
createWorker(); // throws on exception
}
}
return
canBlock;
}
/**
* Helps and/or blocks until the given task is done or timeout.
*
* @param w caller
* @param task the task
* @param deadline for timed waits, if nonzero
* @return task status on exit
*/
final int
awaitJoin(
WorkQueue w,
ForkJoinTask<?>
task, long
deadline) {
int
s = 0;
if (
task != null &&
w != null) {
ForkJoinTask<?>
prevJoin =
w.
currentJoin;
U.
putOrderedObject(
w,
QCURRENTJOIN,
task);
CountedCompleter<?>
cc = (
task instanceof
CountedCompleter) ?
(
CountedCompleter<?>)
task : null;
for (;;) {
if ((
s =
task.
status) < 0)
break;
if (
cc != null)
helpComplete(
w,
cc, 0);
else if (
w.
base ==
w.
top ||
w.
tryRemoveAndExec(
task))
helpStealer(
w,
task);
if ((
s =
task.
status) < 0)
break;
long
ms,
ns;
if (
deadline == 0L)
ms = 0L;
else if ((
ns =
deadline -
System.
nanoTime()) <= 0L)
break;
else if ((
ms =
TimeUnit.
NANOSECONDS.
toMillis(
ns)) <= 0L)
ms = 1L;
if (
tryCompensate(
w)) {
task.
internalWait(
ms);
U.
getAndAddLong(this,
CTL,
AC_UNIT);
}
}
U.
putOrderedObject(
w,
QCURRENTJOIN,
prevJoin);
}
return
s;
}
// Specialized scanning
/**
* Returns a (probably) non-empty steal queue, if one is found
* during a scan, else null. This method must be retried by
* caller if, by the time it tries to use the queue, it is empty.
*/
private
WorkQueue findNonEmptyStealQueue() {
WorkQueue[]
ws; int
m; // one-shot version of scan loop
int
r =
ThreadLocalRandom.
nextSecondarySeed();
if ((
ws =
workQueues) != null && (
m =
ws.length - 1) >= 0) {
for (int
origin =
r &
m,
k =
origin,
oldSum = 0,
checkSum = 0;;) {
WorkQueue q; int
b;
if ((
q =
ws[
k]) != null) {
if ((
b =
q.
base) -
q.
top < 0)
return
q;
checkSum +=
b;
}
if ((
k = (
k + 1) &
m) ==
origin) {
if (
oldSum == (
oldSum =
checkSum))
break;
checkSum = 0;
}
}
}
return null;
}
/**
* Runs tasks until {@code isQuiescent()}. We piggyback on
* active count ctl maintenance, but rather than blocking
* when tasks cannot be found, we rescan until all others cannot
* find tasks either.
*/
final void
helpQuiescePool(
WorkQueue w) {
ForkJoinTask<?>
ps =
w.
currentSteal; // save context
for (boolean
active = true;;) {
long
c;
WorkQueue q;
ForkJoinTask<?>
t; int
b;
w.
execLocalTasks(); // run locals before each scan
if ((
q =
findNonEmptyStealQueue()) != null) {
if (!
active) { // re-establish active count
active = true;
U.
getAndAddLong(this,
CTL,
AC_UNIT);
}
if ((
b =
q.
base) -
q.
top < 0 && (
t =
q.
pollAt(
b)) != null) {
U.
putOrderedObject(
w,
QCURRENTSTEAL,
t);
t.
doExec();
if (++
w.
nsteals < 0)
w.
transferStealCount(this);
}
}
else if (
active) { // decrement active count without queuing
long
nc = (
AC_MASK & ((
c =
ctl) -
AC_UNIT)) | (~
AC_MASK &
c);
if ((int)(
nc >>
AC_SHIFT) + (
config &
SMASK) <= 0)
break; // bypass decrement-then-increment
if (
U.
compareAndSwapLong(this,
CTL,
c,
nc))
active = false;
}
else if ((int)((
c =
ctl) >>
AC_SHIFT) + (
config &
SMASK) <= 0 &&
U.
compareAndSwapLong(this,
CTL,
c,
c +
AC_UNIT))
break;
}
U.
putOrderedObject(
w,
QCURRENTSTEAL,
ps);
}
/**
* Gets and removes a local or stolen task for the given worker.
*
* @return a task, if available
*/
final
ForkJoinTask<?>
nextTaskFor(
WorkQueue w) {
for (
ForkJoinTask<?>
t;;) {
WorkQueue q; int
b;
if ((
t =
w.
nextLocalTask()) != null)
return
t;
if ((
q =
findNonEmptyStealQueue()) == null)
return null;
if ((
b =
q.
base) -
q.
top < 0 && (
t =
q.
pollAt(
b)) != null)
return
t;
}
}
/**
* Returns a cheap heuristic guide for task partitioning when
* programmers, frameworks, tools, or languages have little or no
* idea about task granularity. In essence, by offering this
* method, we ask users only about tradeoffs in overhead vs
* expected throughput and its variance, rather than how finely to
* partition tasks.
*
* In a steady state strict (tree-structured) computation, each
* thread makes available for stealing enough tasks for other
* threads to remain active. Inductively, if all threads play by
* the same rules, each thread should make available only a
* constant number of tasks.
*
* The minimum useful constant is just 1. But using a value of 1
* would require immediate replenishment upon each steal to
* maintain enough tasks, which is infeasible. Further,
* partitionings/granularities of offered tasks should minimize
* steal rates, which in general means that threads nearer the top
* of computation tree should generate more than those nearer the
* bottom. In perfect steady state, each thread is at
* approximately the same level of computation tree. However,
* producing extra tasks amortizes the uncertainty of progress and
* diffusion assumptions.
*
* So, users will want to use values larger (but not much larger)
* than 1 to both smooth over transient shortages and hedge
* against uneven progress; as traded off against the cost of
* extra task overhead. We leave the user to pick a threshold
* value to compare with the results of this call to guide
* decisions, but recommend values such as 3.
*
* When all threads are active, it is on average OK to estimate
* surplus strictly locally. In steady-state, if one thread is
* maintaining say 2 surplus tasks, then so are others. So we can
* just use estimated queue length. However, this strategy alone
* leads to serious mis-estimates in some non-steady-state
* conditions (ramp-up, ramp-down, other stalls). We can detect
* many of these by further considering the number of "idle"
* threads, that are known to have zero queued tasks, so
* compensate by a factor of (#idle/#active) threads.
*/
static int
getSurplusQueuedTaskCount() {
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool pool;
WorkQueue q;
if (((
t =
Thread.
currentThread()) instanceof
ForkJoinWorkerThread)) {
int
p = (
pool = (
wt = (
ForkJoinWorkerThread)
t).
pool).
config &
SMASK;
int
n = (
q =
wt.
workQueue).
top -
q.
base;
int
a = (int)(
pool.
ctl >>
AC_SHIFT) +
p;
return
n - (
a > (
p >>>= 1) ? 0 :
a > (
p >>>= 1) ? 1 :
a > (
p >>>= 1) ? 2 :
a > (
p >>>= 1) ? 4 :
8);
}
return 0;
}
// Termination
/**
* Possibly initiates and/or completes termination.
*
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
* @param enable if true, enable shutdown when next possible
* @return true if now terminating or terminated
*/
private boolean
tryTerminate(boolean
now, boolean
enable) {
int
rs;
if (this ==
common) // cannot shut down
return false;
if ((
rs =
runState) >= 0) {
if (!
enable)
return false;
rs =
lockRunState(); // enter SHUTDOWN phase
unlockRunState(
rs, (
rs & ~
RSLOCK) |
SHUTDOWN);
}
if ((
rs &
STOP) == 0) {
if (!
now) { // check quiescence
for (long
oldSum = 0L;;) { // repeat until stable
WorkQueue[]
ws;
WorkQueue w; int
m,
b; long
c;
long
checkSum =
ctl;
if ((int)(
checkSum >>
AC_SHIFT) + (
config &
SMASK) > 0)
return false; // still active workers
if ((
ws =
workQueues) == null || (
m =
ws.length - 1) <= 0)
break; // check queues
for (int
i = 0;
i <=
m; ++
i) {
if ((
w =
ws[
i]) != null) {
if ((
b =
w.
base) !=
w.
top ||
w.
scanState >= 0 ||
w.
currentSteal != null) {
tryRelease(
c =
ctl,
ws[
m & (int)
c],
AC_UNIT);
return false; // arrange for recheck
}
checkSum +=
b;
if ((
i & 1) == 0)
w.
qlock = -1; // try to disable external
}
}
if (
oldSum == (
oldSum =
checkSum))
break;
}
}
if ((
runState &
STOP) == 0) {
rs =
lockRunState(); // enter STOP phase
unlockRunState(
rs, (
rs & ~
RSLOCK) |
STOP);
}
}
int
pass = 0; // 3 passes to help terminate
for (long
oldSum = 0L;;) { // or until done or stable
WorkQueue[]
ws;
WorkQueue w;
ForkJoinWorkerThread wt; int
m;
long
checkSum =
ctl;
if ((short)(
checkSum >>>
TC_SHIFT) + (
config &
SMASK) <= 0 ||
(
ws =
workQueues) == null || (
m =
ws.length - 1) <= 0) {
if ((
runState &
TERMINATED) == 0) {
rs =
lockRunState(); // done
unlockRunState(
rs, (
rs & ~
RSLOCK) |
TERMINATED);
synchronized (this) {
notifyAll(); } // for awaitTermination
}
break;
}
for (int
i = 0;
i <=
m; ++
i) {
if ((
w =
ws[
i]) != null) {
checkSum +=
w.
base;
w.
qlock = -1; // try to disable
if (
pass > 0) {
w.
cancelAll(); // clear queue
if (
pass > 1 && (
wt =
w.
owner) != null) {
if (!
wt.
isInterrupted()) {
try { // unblock join
wt.
interrupt();
} catch (
Throwable ignore) {
}
}
if (
w.
scanState < 0)
U.
unpark(
wt); // wake up
}
}
}
}
if (
checkSum !=
oldSum) { // unstable
oldSum =
checkSum;
pass = 0;
}
else if (
pass > 3 &&
pass >
m) // can't further help
break;
else if (++
pass > 1) { // try to dequeue
long
c; int
j = 0,
sp; // bound attempts
while (
j++ <=
m && (
sp = (int)(
c =
ctl)) != 0)
tryRelease(
c,
ws[
sp &
m],
AC_UNIT);
}
}
return true;
}
// External operations
/**
* Full version of externalPush, handling uncommon cases, as well
* as performing secondary initialization upon the first
* submission of the first task to the pool. It also detects
* first submission by an external thread and creates a new shared
* queue if the one at index if empty or contended.
*
* @param task the task. Caller must ensure non-null.
*/
private void
externalSubmit(
ForkJoinTask<?>
task) {
int
r; // initialize caller's probe
if ((
r =
ThreadLocalRandom.
getProbe()) == 0) {
ThreadLocalRandom.
localInit();
r =
ThreadLocalRandom.
getProbe();
}
for (;;) {
WorkQueue[]
ws;
WorkQueue q; int
rs,
m,
k;
boolean
move = false;
if ((
rs =
runState) < 0) {
tryTerminate(false, false); // help terminate
throw new
RejectedExecutionException();
}
else if ((
rs &
STARTED) == 0 || // initialize
((
ws =
workQueues) == null || (
m =
ws.length - 1) < 0)) {
int
ns = 0;
rs =
lockRunState();
try {
if ((
rs &
STARTED) == 0) {
U.
compareAndSwapObject(this,
STEALCOUNTER, null,
new
AtomicLong());
// create workQueues array with size a power of two
int
p =
config &
SMASK; // ensure at least 2 slots
int
n = (
p > 1) ?
p - 1 : 1;
n |=
n >>> 1;
n |=
n >>> 2;
n |=
n >>> 4;
n |=
n >>> 8;
n |=
n >>> 16;
n = (
n + 1) << 1;
workQueues = new
WorkQueue[
n];
ns =
STARTED;
}
} finally {
unlockRunState(
rs, (
rs & ~
RSLOCK) |
ns);
}
}
else if ((
q =
ws[
k =
r &
m &
SQMASK]) != null) {
if (
q.
qlock == 0 &&
U.
compareAndSwapInt(
q,
QLOCK, 0, 1)) {
ForkJoinTask<?>[]
a =
q.
array;
int
s =
q.
top;
boolean
submitted = false; // initial submission or resizing
try { // locked version of push
if ((
a != null &&
a.length >
s + 1 -
q.
base) ||
(
a =
q.
growArray()) != null) {
int
j = (((
a.length - 1) &
s) <<
ASHIFT) +
ABASE;
U.
putOrderedObject(
a,
j,
task);
U.
putOrderedInt(
q,
QTOP,
s + 1);
submitted = true;
}
} finally {
U.
compareAndSwapInt(
q,
QLOCK, 1, 0);
}
if (
submitted) {
signalWork(
ws,
q);
return;
}
}
move = true; // move on failure
}
else if (((
rs =
runState) &
RSLOCK) == 0) { // create new queue
q = new
WorkQueue(this, null);
q.
hint =
r;
q.
config =
k |
SHARED_QUEUE;
q.
scanState =
INACTIVE;
rs =
lockRunState(); // publish index
if (
rs > 0 && (
ws =
workQueues) != null &&
k <
ws.length &&
ws[
k] == null)
ws[
k] =
q; // else terminated
unlockRunState(
rs,
rs & ~
RSLOCK);
}
else
move = true; // move if busy
if (
move)
r =
ThreadLocalRandom.
advanceProbe(
r);
}
}
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void
externalPush(
ForkJoinTask<?>
task) {
WorkQueue[]
ws;
WorkQueue q; int
m;
int
r =
ThreadLocalRandom.
getProbe();
int
rs =
runState;
if ((
ws =
workQueues) != null && (
m = (
ws.length - 1)) >= 0 &&
(
q =
ws[
m &
r &
SQMASK]) != null &&
r != 0 &&
rs > 0 &&
U.
compareAndSwapInt(
q,
QLOCK, 0, 1)) {
ForkJoinTask<?>[]
a; int
am,
n,
s;
if ((
a =
q.
array) != null &&
(
am =
a.length - 1) > (
n = (
s =
q.
top) -
q.
base)) {
int
j = ((
am &
s) <<
ASHIFT) +
ABASE;
U.
putOrderedObject(
a,
j,
task);
U.
putOrderedInt(
q,
QTOP,
s + 1);
U.
putIntVolatile(
q,
QLOCK, 0);
if (
n <= 1)
signalWork(
ws,
q);
return;
}
U.
compareAndSwapInt(
q,
QLOCK, 1, 0);
}
externalSubmit(
task);
}
/**
* Returns common pool queue for an external thread.
*/
static
WorkQueue commonSubmitterQueue() {
ForkJoinPool p =
common;
int
r =
ThreadLocalRandom.
getProbe();
WorkQueue[]
ws; int
m;
return (
p != null && (
ws =
p.
workQueues) != null &&
(
m =
ws.length - 1) >= 0) ?
ws[
m &
r &
SQMASK] : null;
}
/**
* Performs tryUnpush for an external submitter: Finds queue,
* locks if apparently non-empty, validates upon locking, and
* adjusts top. Each check can fail but rarely does.
*/
final boolean
tryExternalUnpush(
ForkJoinTask<?>
task) {
WorkQueue[]
ws;
WorkQueue w;
ForkJoinTask<?>[]
a; int
m,
s;
int
r =
ThreadLocalRandom.
getProbe();
if ((
ws =
workQueues) != null && (
m =
ws.length - 1) >= 0 &&
(
w =
ws[
m &
r &
SQMASK]) != null &&
(
a =
w.
array) != null && (
s =
w.
top) !=
w.
base) {
long
j = (((
a.length - 1) & (
s - 1)) <<
ASHIFT) +
ABASE;
if (
U.
compareAndSwapInt(
w,
QLOCK, 0, 1)) {
if (
w.
top ==
s &&
w.
array ==
a &&
U.
getObject(
a,
j) ==
task &&
U.
compareAndSwapObject(
a,
j,
task, null)) {
U.
putOrderedInt(
w,
QTOP,
s - 1);
U.
putOrderedInt(
w,
QLOCK, 0);
return true;
}
U.
compareAndSwapInt(
w,
QLOCK, 1, 0);
}
}
return false;
}
/**
* Performs helpComplete for an external submitter.
*/
final int
externalHelpComplete(
CountedCompleter<?>
task, int
maxTasks) {
WorkQueue[]
ws; int
n;
int
r =
ThreadLocalRandom.
getProbe();
return ((
ws =
workQueues) == null || (
n =
ws.length) == 0) ? 0 :
helpComplete(
ws[(
n - 1) &
r &
SQMASK],
task,
maxTasks);
}
// Exported methods
// Constructors
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
* java.lang.Runtime#availableProcessors}, using the {@linkplain
* #defaultForkJoinWorkerThreadFactory default thread factory},
* no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public
ForkJoinPool() {
this(
Math.
min(
MAX_CAP,
Runtime.
getRuntime().
availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
* level, the {@linkplain
* #defaultForkJoinWorkerThreadFactory default thread factory},
* no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @param parallelism the parallelism level
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public
ForkJoinPool(int
parallelism) {
this(
parallelism,
defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the given parameters.
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
* For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public
ForkJoinPool(int
parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean
asyncMode) {
this(
checkParallelism(
parallelism),
checkFactory(
factory),
handler,
asyncMode ?
FIFO_QUEUE :
LIFO_QUEUE,
"ForkJoinPool-" +
nextPoolId() + "-worker-");
checkPermission();
}
private static int
checkParallelism(int
parallelism) {
if (
parallelism <= 0 ||
parallelism >
MAX_CAP)
throw new
IllegalArgumentException();
return
parallelism;
}
private static
ForkJoinWorkerThreadFactory checkFactory
(
ForkJoinWorkerThreadFactory factory) {
if (
factory == null)
throw new
NullPointerException();
return
factory;
}
/**
* Creates a {@code ForkJoinPool} with the given parameters, without
* any security checks or parameter validation. Invoked directly by
* makeCommonPool.
*/
private
ForkJoinPool(int
parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int
mode,
String workerNamePrefix) {
this.
workerNamePrefix =
workerNamePrefix;
this.
factory =
factory;
this.
ueh =
handler;
this.
config = (
parallelism &
SMASK) |
mode;
long
np = (long)(-
parallelism); // offset ctl counts
this.
ctl = ((
np <<
AC_SHIFT) &
AC_MASK) | ((
np <<
TC_SHIFT) &
TC_MASK);
}
/**
* Returns the common pool instance. This pool is statically
* constructed; its run state is unaffected by attempts to {@link
* #shutdown} or {@link #shutdownNow}. However this pool and any
* ongoing processing are automatically terminated upon program
* {@link System#exit}. Any program that relies on asynchronous
* task processing to complete before program termination should
* invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
* before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static
ForkJoinPool commonPool() {
// assert common != null : "static init error";
return
common;
}
// Execution methods
/**
* Performs the given task, returning its result upon completion.
* If the computation encounters an unchecked Exception or Error,
* it is rethrown as the outcome of this invocation. Rethrown
* exceptions behave in the same way as regular exceptions, but,
* when possible, contain stack traces (as displayed for example
* using {@code ex.printStackTrace()}) of both the current thread
* as well as the thread actually encountering the exception;
* minimally only the latter.
*
* @param task the task
* @param <T> the type of the task's result
* @return the task's result
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T
invoke(
ForkJoinTask<T>
task) {
if (
task == null)
throw new
NullPointerException();
externalPush(
task);
return
task.
join();
}
/**
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void
execute(
ForkJoinTask<?>
task) {
if (
task == null)
throw new
NullPointerException();
externalPush(
task);
}
// AbstractExecutorService methods
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void
execute(
Runnable task) {
if (
task == null)
throw new
NullPointerException();
ForkJoinTask<?>
job;
if (
task instanceof
ForkJoinTask<?>) // avoid re-wrap
job = (
ForkJoinTask<?>)
task;
else
job = new
ForkJoinTask.
RunnableExecuteAction(
task);
externalPush(
job);
}
/**
* Submits a ForkJoinTask for execution.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T>
ForkJoinTask<T>
submit(
ForkJoinTask<T>
task) {
if (
task == null)
throw new
NullPointerException();
externalPush(
task);
return
task;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T>
ForkJoinTask<T>
submit(
Callable<T>
task) {
ForkJoinTask<T>
job = new
ForkJoinTask.
AdaptedCallable<T>(
task);
externalPush(
job);
return
job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T>
ForkJoinTask<T>
submit(
Runnable task, T
result) {
ForkJoinTask<T>
job = new
ForkJoinTask.
AdaptedRunnable<T>(
task,
result);
externalPush(
job);
return
job;
}
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public
ForkJoinTask<?>
submit(
Runnable task) {
if (
task == null)
throw new
NullPointerException();
ForkJoinTask<?>
job;
if (
task instanceof
ForkJoinTask<?>) // avoid re-wrap
job = (
ForkJoinTask<?>)
task;
else
job = new
ForkJoinTask.
AdaptedRunnableAction(
task);
externalPush(
job);
return
job;
}
/**
* @throws NullPointerException {@inheritDoc}
* @throws RejectedExecutionException {@inheritDoc}
*/
public <T>
List<
Future<T>>
invokeAll(
Collection<? extends
Callable<T>>
tasks) {
// In previous versions of this class, this method constructed
// a task to run ForkJoinTask.invokeAll, but now external
// invocation of multiple tasks is at least as efficient.
ArrayList<
Future<T>>
futures = new
ArrayList<>(
tasks.
size());
boolean
done = false;
try {
for (
Callable<T>
t :
tasks) {
ForkJoinTask<T>
f = new
ForkJoinTask.
AdaptedCallable<T>(
t);
futures.
add(
f);
externalPush(
f);
}
for (int
i = 0,
size =
futures.
size();
i <
size;
i++)
((
ForkJoinTask<?>)
futures.
get(
i)).
quietlyJoin();
done = true;
return
futures;
} finally {
if (!
done)
for (int
i = 0,
size =
futures.
size();
i <
size;
i++)
futures.
get(
i).
cancel(false);
}
}
/**
* Returns the factory used for constructing new workers.
*
* @return the factory used for constructing new workers
*/
public
ForkJoinWorkerThreadFactory getFactory() {
return
factory;
}
/**
* Returns the handler for internal worker threads that terminate
* due to unrecoverable errors encountered while executing tasks.
*
* @return the handler, or {@code null} if none
*/
public
UncaughtExceptionHandler getUncaughtExceptionHandler() {
return
ueh;
}
/**
* Returns the targeted parallelism level of this pool.
*
* @return the targeted parallelism level of this pool
*/
public int
getParallelism() {
int
par;
return ((
par =
config &
SMASK) > 0) ?
par : 1;
}
/**
* Returns the targeted parallelism level of the common pool.
*
* @return the targeted parallelism level of the common pool
* @since 1.8
*/
public static int
getCommonPoolParallelism() {
return
commonParallelism;
}
/**
* Returns the number of worker threads that have started but not
* yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
* @return the number of worker threads
*/
public int
getPoolSize() {
return (
config &
SMASK) + (short)(
ctl >>>
TC_SHIFT);
}
/**
* Returns {@code true} if this pool uses local first-in-first-out
* scheduling mode for forked tasks that are never joined.
*
* @return {@code true} if this pool uses async mode
*/
public boolean
getAsyncMode() {
return (
config &
FIFO_QUEUE) != 0;
}
/**
* Returns an estimate of the number of worker threads that are
* not blocked waiting to join tasks or for other managed
* synchronization. This method may overestimate the
* number of running threads.
*
* @return the number of worker threads
*/
public int
getRunningThreadCount() {
int
rc = 0;
WorkQueue[]
ws;
WorkQueue w;
if ((
ws =
workQueues) != null) {
for (int
i = 1;
i <
ws.length;
i += 2) {
if ((
w =
ws[
i]) != null &&
w.
isApparentlyUnblocked())
++
rc;
}
}
return
rc;
}
/**
* Returns an estimate of the number of threads that are currently
* stealing or executing tasks. This method may overestimate the
* number of active threads.
*
* @return the number of active threads
*/
public int
getActiveThreadCount() {
int
r = (
config &
SMASK) + (int)(
ctl >>
AC_SHIFT);
return (
r <= 0) ? 0 :
r; // suppress momentarily negative values
}
/**
* Returns {@code true} if all worker threads are currently idle.
* An idle worker is one that cannot obtain a task to execute
* because none are available to steal from other threads, and
* there are no pending submissions to the pool. This method is
* conservative; it might not return {@code true} immediately upon
* idleness of all threads, but will eventually become true if
* threads remain inactive.
*
* @return {@code true} if all threads are currently idle
*/
public boolean
isQuiescent() {
return (
config &
SMASK) + (int)(
ctl >>
AC_SHIFT) <= 0;
}
/**
* Returns an estimate of the total number of tasks stolen from
* one thread's work queue by another. The reported value
* underestimates the actual total number of steals when the pool
* is not quiescent. This value may be useful for monitoring and
* tuning fork/join programs: in general, steal counts should be
* high enough to keep threads busy, but low enough to avoid
* overhead and contention across threads.
*
* @return the number of steals
*/
public long
getStealCount() {
AtomicLong sc =
stealCounter;
long
count = (
sc == null) ? 0L :
sc.
get();
WorkQueue[]
ws;
WorkQueue w;
if ((
ws =
workQueues) != null) {
for (int
i = 1;
i <
ws.length;
i += 2) {
if ((
w =
ws[
i]) != null)
count +=
w.
nsteals;
}
}
return
count;
}
/**
* Returns an estimate of the total number of tasks currently held
* in queues by worker threads (but not including tasks submitted
* to the pool that have not begun executing). This value is only
* an approximation, obtained by iterating across all threads in
* the pool. This method may be useful for tuning task
* granularities.
*
* @return the number of queued tasks
*/
public long
getQueuedTaskCount() {
long
count = 0;
WorkQueue[]
ws;
WorkQueue w;
if ((
ws =
workQueues) != null) {
for (int
i = 1;
i <
ws.length;
i += 2) {
if ((
w =
ws[
i]) != null)
count +=
w.
queueSize();
}
}
return
count;
}
/**
* Returns an estimate of the number of tasks submitted to this
* pool that have not yet begun executing. This method may take
* time proportional to the number of submissions.
*
* @return the number of queued submissions
*/
public int
getQueuedSubmissionCount() {
int
count = 0;
WorkQueue[]
ws;
WorkQueue w;
if ((
ws =
workQueues) != null) {
for (int
i = 0;
i <
ws.length;
i += 2) {
if ((
w =
ws[
i]) != null)
count +=
w.
queueSize();
}
}
return
count;
}
/**
* Returns {@code true} if there are any tasks submitted to this
* pool that have not yet begun executing.
*
* @return {@code true} if there are any queued submissions
*/
public boolean
hasQueuedSubmissions() {
WorkQueue[]
ws;
WorkQueue w;
if ((
ws =
workQueues) != null) {
for (int
i = 0;
i <
ws.length;
i += 2) {
if ((
w =
ws[
i]) != null && !
w.
isEmpty())
return true;
}
}
return false;
}
/**
* Removes and returns the next unexecuted submission if one is
* available. This method may be useful in extensions to this
* class that re-assign work in systems with multiple pools.
*
* @return the next submission, or {@code null} if none
*/
protected
ForkJoinTask<?>
pollSubmission() {
WorkQueue[]
ws;
WorkQueue w;
ForkJoinTask<?>
t;
if ((
ws =
workQueues) != null) {
for (int
i = 0;
i <
ws.length;
i += 2) {
if ((
w =
ws[
i]) != null && (
t =
w.
poll()) != null)
return
t;
}
}
return null;
}
/**
* Removes all available unexecuted submitted and forked tasks
* from scheduling queues and adds them to the given collection,
* without altering their execution status. These may include
* artificially generated or wrapped tasks. This method is
* designed to be invoked only when the pool is known to be
* quiescent. Invocations at other times may not remove all
* tasks. A failure encountered while attempting to add elements
* to collection {@code c} may result in elements being in
* neither, either or both collections when the associated
* exception is thrown. The behavior of this operation is
* undefined if the specified collection is modified while the
* operation is in progress.
*
* @param c the collection to transfer elements into
* @return the number of elements transferred
*/
protected int
drainTasksTo(
Collection<? super
ForkJoinTask<?>>
c) {
int
count = 0;
WorkQueue[]
ws;
WorkQueue w;
ForkJoinTask<?>
t;
if ((
ws =
workQueues) != null) {
for (int
i = 0;
i <
ws.length; ++
i) {
if ((
w =
ws[
i]) != null) {
while ((
t =
w.
poll()) != null) {
c.
add(
t);
++
count;
}
}
}
}
return
count;
}
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state, parallelism level, and
* worker and task counts.
*
* @return a string identifying this pool, as well as its state
*/
public
String toString() {
// Use a single pass through workQueues to collect counts
long
qt = 0L,
qs = 0L; int
rc = 0;
AtomicLong sc =
stealCounter;
long
st = (
sc == null) ? 0L :
sc.
get();
long
c =
ctl;
WorkQueue[]
ws;
WorkQueue w;
if ((
ws =
workQueues) != null) {
for (int
i = 0;
i <
ws.length; ++
i) {
if ((
w =
ws[
i]) != null) {
int
size =
w.
queueSize();
if ((
i & 1) == 0)
qs +=
size;
else {
qt +=
size;
st +=
w.
nsteals;
if (
w.
isApparentlyUnblocked())
++
rc;
}
}
}
}
int
pc = (
config &
SMASK);
int
tc =
pc + (short)(
c >>>
TC_SHIFT);
int
ac =
pc + (int)(
c >>
AC_SHIFT);
if (
ac < 0) // ignore transient negative
ac = 0;
int
rs =
runState;
String level = ((
rs &
TERMINATED) != 0 ? "Terminated" :
(
rs &
STOP) != 0 ? "Terminating" :
(
rs &
SHUTDOWN) != 0 ? "Shutting down" :
"Running");
return super.toString() +
"[" +
level +
", parallelism = " +
pc +
", size = " +
tc +
", active = " +
ac +
", running = " +
rc +
", steals = " +
st +
", tasks = " +
qt +
", submissions = " +
qs +
"]";
}
/**
* Possibly initiates an orderly shutdown in which previously
* submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no effect on execution state if this
* is the {@link #commonPool()}, and no additional effect if
* already shut down. Tasks that are in the process of being
* submitted concurrently during the course of this method may or
* may not be rejected.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public void
shutdown() {
checkPermission();
tryTerminate(false, true);
}
/**
* Possibly attempts to cancel and/or stop all tasks, and reject
* all subsequently submitted tasks. Invocation has no effect on
* execution state if this is the {@link #commonPool()}, and no
* additional effect if already shut down. Otherwise, tasks that
* are in the process of being submitted or executed concurrently
* during the course of this method may or may not be
* rejected. This method cancels both existing and unexecuted
* tasks, in order to permit termination in the presence of task
* dependencies. So the method always returns an empty list
* (unlike the case for some other Executors).
*
* @return an empty list
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public
List<
Runnable>
shutdownNow() {
checkPermission();
tryTerminate(true, true);
return
Collections.
emptyList();
}
/**
* Returns {@code true} if all tasks have completed following shut down.
*
* @return {@code true} if all tasks have completed following shut down
*/
public boolean
isTerminated() {
return (
runState &
TERMINATED) != 0;
}
/**
* Returns {@code true} if the process of termination has
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, or are waiting for I/O,
* causing this executor not to properly terminate. (See the
* advisory notes for class {@link ForkJoinTask} stating that
* tasks should not normally entail blocking operations. But if
* they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean
isTerminating() {
int
rs =
runState;
return (
rs &
STOP) != 0 && (
rs &
TERMINATED) == 0;
}
/**
* Returns {@code true} if this pool has been shut down.
*
* @return {@code true} if this pool has been shut down
*/
public boolean
isShutdown() {
return (
runState &
SHUTDOWN) != 0;
}
/**
* Blocks until all tasks have completed execution after a
* shutdown request, or the timeout occurs, or the current thread
* is interrupted, whichever happens first. Because the {@link
* #commonPool()} never terminates until program shutdown, when
* applied to the common pool, this method is equivalent to {@link
* #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
public boolean
awaitTermination(long
timeout,
TimeUnit unit)
throws
InterruptedException {
if (
Thread.
interrupted())
throw new
InterruptedException();
if (this ==
common) {
awaitQuiescence(
timeout,
unit);
return false;
}
long
nanos =
unit.
toNanos(
timeout);
if (
isTerminated())
return true;
if (
nanos <= 0L)
return false;
long
deadline =
System.
nanoTime() +
nanos;
synchronized (this) {
for (;;) {
if (
isTerminated())
return true;
if (
nanos <= 0L)
return false;
long
millis =
TimeUnit.
NANOSECONDS.
toMillis(
nanos);
wait(
millis > 0L ?
millis : 1L);
nanos =
deadline -
System.
nanoTime();
}
}
}
/**
* If called by a ForkJoinTask operating in this pool, equivalent
* in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
* waits and/or attempts to assist performing tasks until this
* pool {@link #isQuiescent} or the indicated timeout elapses.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if quiescent; {@code false} if the
* timeout elapsed.
*/
public boolean
awaitQuiescence(long
timeout,
TimeUnit unit) {
long
nanos =
unit.
toNanos(
timeout);
ForkJoinWorkerThread wt;
Thread thread =
Thread.
currentThread();
if ((
thread instanceof
ForkJoinWorkerThread) &&
(
wt = (
ForkJoinWorkerThread)
thread).
pool == this) {
helpQuiescePool(
wt.
workQueue);
return true;
}
long
startTime =
System.
nanoTime();
WorkQueue[]
ws;
int
r = 0,
m;
boolean
found = true;
while (!
isQuiescent() && (
ws =
workQueues) != null &&
(
m =
ws.length - 1) >= 0) {
if (!
found) {
if ((
System.
nanoTime() -
startTime) >
nanos)
return false;
Thread.
yield(); // cannot block
}
found = false;
for (int
j = (
m + 1) << 2;
j >= 0; --
j) {
ForkJoinTask<?>
t;
WorkQueue q; int
b,
k;
if ((
k =
r++ &
m) <=
m &&
k >= 0 && (
q =
ws[
k]) != null &&
(
b =
q.
base) -
q.
top < 0) {
found = true;
if ((
t =
q.
pollAt(
b)) != null)
t.
doExec();
break;
}
}
}
return true;
}
/**
* Waits and/or attempts to assist performing tasks indefinitely
* until the {@link #commonPool()} {@link #isQuiescent}.
*/
static void
quiesceCommonPool() {
common.
awaitQuiescence(
Long.
MAX_VALUE,
TimeUnit.
NANOSECONDS);
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
* <p>A {@code ManagedBlocker} provides two methods. Method
* {@link #isReleasable} must return {@code true} if blocking is
* not necessary. Method {@link #block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
* thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
* The unusual methods in this API accommodate synchronizers that
* may, but don't usually, block for long periods. Similarly, they
* allow more efficient internal handling of cases in which
* additional workers may be, but usually are not, needed to
* ensure sufficient parallelism. Toward this end,
* implementations of method {@code isReleasable} must be amenable
* to repeated invocation.
*
* <p>For example, here is a ManagedBlocker based on a
* ReentrantLock:
* <pre> {@code
* class ManagedLocker implements ManagedBlocker {
* final ReentrantLock lock;
* boolean hasLock = false;
* ManagedLocker(ReentrantLock lock) { this.lock = lock; }
* public boolean block() {
* if (!hasLock)
* lock.lock();
* return true;
* }
* public boolean isReleasable() {
* return hasLock || (hasLock = lock.tryLock());
* }
* }}</pre>
*
* <p>Here is a class that possibly blocks waiting for an
* item on a given queue:
* <pre> {@code
* class QueueTaker<E> implements ManagedBlocker {
* final BlockingQueue<E> queue;
* volatile E item = null;
* QueueTaker(BlockingQueue<E> q) { this.queue = q; }
* public boolean block() throws InterruptedException {
* if (item == null)
* item = queue.take();
* return true;
* }
* public boolean isReleasable() {
* return item != null || (item = queue.poll()) != null;
* }
* public E getItem() { // call after pool.managedBlock completes
* return item;
* }
* }}</pre>
*/
public static interface
ManagedBlocker {
/**
* Possibly blocks the current thread, for example waiting for
* a lock or condition.
*
* @return {@code true} if no additional blocking is necessary
* (i.e., if isReleasable would return true)
* @throws InterruptedException if interrupted while waiting
* (the method is not required to do so, but is allowed to)
*/
boolean
block() throws
InterruptedException;
/**
* Returns {@code true} if blocking is unnecessary.
* @return {@code true} if blocking is unnecessary
*/
boolean
isReleasable();
}
/**
* Runs the given possibly blocking task. When {@linkplain
* ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
* method possibly arranges for a spare thread to be activated if
* necessary to ensure sufficient parallelism while the current
* thread is blocked in {@link ManagedBlocker#block blocker.block()}.
*
* <p>This method repeatedly calls {@code blocker.isReleasable()} and
* {@code blocker.block()} until either method returns {@code true}.
* Every call to {@code blocker.block()} is preceded by a call to
* {@code blocker.isReleasable()} that returned {@code false}.
*
* <p>If not running in a ForkJoinPool, this method is
* behaviorally equivalent to
* <pre> {@code
* while (!blocker.isReleasable())
* if (blocker.block())
* break;}</pre>
*
* If running in a ForkJoinPool, the pool may first be expanded to
* ensure sufficient parallelism available during the call to
* {@code blocker.block()}.
*
* @param blocker the blocker task
* @throws InterruptedException if {@code blocker.block()} did so
*/
public static void
managedBlock(
ManagedBlocker blocker)
throws
InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t =
Thread.
currentThread();
if ((
t instanceof
ForkJoinWorkerThread) &&
(
p = (
wt = (
ForkJoinWorkerThread)
t).
pool) != null) {
WorkQueue w =
wt.
workQueue;
while (!
blocker.
isReleasable()) {
if (
p.
tryCompensate(
w)) {
try {
do {} while (!
blocker.
isReleasable() &&
!
blocker.
block());
} finally {
U.
getAndAddLong(
p,
CTL,
AC_UNIT);
}
break;
}
}
}
else {
do {} while (!
blocker.
isReleasable() &&
!
blocker.
block());
}
}
// AbstractExecutorService overrides. These rely on undocumented
// fact that ForkJoinTask.adapt returns ForkJoinTasks that also
// implement RunnableFuture.
protected <T>
RunnableFuture<T>
newTaskFor(
Runnable runnable, T
value) {
return new
ForkJoinTask.
AdaptedRunnable<T>(
runnable,
value);
}
protected <T>
RunnableFuture<T>
newTaskFor(
Callable<T>
callable) {
return new
ForkJoinTask.
AdaptedCallable<T>(
callable);
}
// Unsafe mechanics
private static final sun.misc.
Unsafe U;
private static final int
ABASE;
private static final int
ASHIFT;
private static final long
CTL;
private static final long
RUNSTATE;
private static final long
STEALCOUNTER;
private static final long
PARKBLOCKER;
private static final long
QTOP;
private static final long
QLOCK;
private static final long
QSCANSTATE;
private static final long
QPARKER;
private static final long
QCURRENTSTEAL;
private static final long
QCURRENTJOIN;
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.
Unsafe.
getUnsafe();
Class<?>
k =
ForkJoinPool.class;
CTL =
U.
objectFieldOffset
(
k.
getDeclaredField("ctl"));
RUNSTATE =
U.
objectFieldOffset
(
k.
getDeclaredField("runState"));
STEALCOUNTER =
U.
objectFieldOffset
(
k.
getDeclaredField("stealCounter"));
Class<?>
tk =
Thread.class;
PARKBLOCKER =
U.
objectFieldOffset
(
tk.
getDeclaredField("parkBlocker"));
Class<?>
wk =
WorkQueue.class;
QTOP =
U.
objectFieldOffset
(
wk.
getDeclaredField("top"));
QLOCK =
U.
objectFieldOffset
(
wk.
getDeclaredField("qlock"));
QSCANSTATE =
U.
objectFieldOffset
(
wk.
getDeclaredField("scanState"));
QPARKER =
U.
objectFieldOffset
(
wk.
getDeclaredField("parker"));
QCURRENTSTEAL =
U.
objectFieldOffset
(
wk.
getDeclaredField("currentSteal"));
QCURRENTJOIN =
U.
objectFieldOffset
(
wk.
getDeclaredField("currentJoin"));
Class<?>
ak =
ForkJoinTask[].class;
ABASE =
U.
arrayBaseOffset(
ak);
int
scale =
U.
arrayIndexScale(
ak);
if ((
scale & (
scale - 1)) != 0)
throw new
Error("data type scale not a power of two");
ASHIFT = 31 -
Integer.
numberOfLeadingZeros(
scale);
} catch (
Exception e) {
throw new
Error(
e);
}
commonMaxSpares =
DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new
DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new
RuntimePermission("modifyThread");
common = java.security.
AccessController.
doPrivileged
(new java.security.
PrivilegedAction<
ForkJoinPool>() {
public
ForkJoinPool run() { return
makeCommonPool(); }});
int
par =
common.
config &
SMASK; // report 1 even if threads disabled
commonParallelism =
par > 0 ?
par : 1;
}
/**
* Creates and returns the common pool, respecting user settings
* specified via system properties.
*/
private static
ForkJoinPool makeCommonPool() {
int
parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp =
System.
getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp =
System.
getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp =
System.
getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (
pp != null)
parallelism =
Integer.
parseInt(
pp);
if (
fp != null)
factory = ((
ForkJoinWorkerThreadFactory)
ClassLoader.
getSystemClassLoader().
loadClass(
fp).
newInstance());
if (
hp != null)
handler = ((
UncaughtExceptionHandler)
ClassLoader.
getSystemClassLoader().
loadClass(
hp).
newInstance());
} catch (
Exception ignore) {
}
if (
factory == null) {
if (
System.
getSecurityManager() == null)
factory =
defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new
InnocuousForkJoinWorkerThreadFactory();
}
if (
parallelism < 0 && // default 1 less than #cores
(
parallelism =
Runtime.
getRuntime().
availableProcessors() - 1) <= 0)
parallelism = 1;
if (
parallelism >
MAX_CAP)
parallelism =
MAX_CAP;
return new
ForkJoinPool(
parallelism,
factory,
handler,
LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
/**
* Factory for innocuous worker threads
*/
static final class
InnocuousForkJoinWorkerThreadFactory
implements
ForkJoinWorkerThreadFactory {
/**
* An ACC to restrict permissions for the factory itself.
* The constructed workers have no permissions set.
*/
private static final
AccessControlContext innocuousAcc;
static {
Permissions innocuousPerms = new
Permissions();
innocuousPerms.
add(
modifyThreadPermission);
innocuousPerms.
add(new
RuntimePermission(
"enableContextClassLoaderOverride"));
innocuousPerms.
add(new
RuntimePermission(
"modifyThreadGroup"));
innocuousAcc = new
AccessControlContext(new
ProtectionDomain[] {
new
ProtectionDomain(null,
innocuousPerms)
});
}
public final
ForkJoinWorkerThread newThread(
ForkJoinPool pool) {
return (
ForkJoinWorkerThread.
InnocuousForkJoinWorkerThread)
java.security.
AccessController.
doPrivileged(
new java.security.
PrivilegedAction<
ForkJoinWorkerThread>() {
public
ForkJoinWorkerThread run() {
return new
ForkJoinWorkerThread.
InnocuousForkJoinWorkerThread(
pool);
}},
innocuousAcc);
}
}
}