/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.
InternalThreadLocalMap;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
StringUtil;
import io.netty.util.internal.
SystemPropertyUtil;
import io.netty.util.internal.
ThrowableUtil;
import io.netty.util.internal.logging.
InternalLogger;
import io.netty.util.internal.logging.
InternalLoggerFactory;
import java.util.concurrent.
CancellationException;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicReferenceFieldUpdater;
import static io.netty.util.internal.
ObjectUtil.checkNotNull;
import static java.util.concurrent.
TimeUnit.
MILLISECONDS;
public class
DefaultPromise<V> extends
AbstractFuture<V> implements
Promise<V> {
private static final
InternalLogger logger =
InternalLoggerFactory.
getInstance(
DefaultPromise.class);
private static final
InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.
getInstance(
DefaultPromise.class.
getName() + ".rejectedExecution");
private static final int
MAX_LISTENER_STACK_DEPTH =
Math.
min(8,
SystemPropertyUtil.
getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
@
SuppressWarnings("rawtypes")
private static final
AtomicReferenceFieldUpdater<
DefaultPromise,
Object>
RESULT_UPDATER =
AtomicReferenceFieldUpdater.
newUpdater(
DefaultPromise.class,
Object.class, "result");
private static final
Object SUCCESS = new
Object();
private static final
Object UNCANCELLABLE = new
Object();
private static final
CauseHolder CANCELLATION_CAUSE_HOLDER = new
CauseHolder(
ThrowableUtil.
unknownStackTrace(
new
CancellationException(),
DefaultPromise.class, "cancel(...)"));
private volatile
Object result;
private final
EventExecutor executor;
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
private
Object listeners;
/**
* Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
*/
private short
waiters;
/**
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
* executor changes.
*/
private boolean
notifyingListeners;
/**
* Creates a new instance.
*
* It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
*
* @param executor
* the {@link EventExecutor} which is used to notify the promise once it is complete.
* It is assumed this executor will protect against {@link StackOverflowError} exceptions.
* The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
* depth exceeds a threshold.
*
*/
public
DefaultPromise(
EventExecutor executor) {
this.
executor =
checkNotNull(
executor, "executor");
}
/**
* See {@link #executor()} for expectations of the executor.
*/
protected
DefaultPromise() {
// only for subclasses
executor = null;
}
@
Override
public
Promise<V>
setSuccess(V
result) {
if (
setSuccess0(
result)) {
notifyListeners();
return this;
}
throw new
IllegalStateException("complete already: " + this);
}
@
Override
public boolean
trySuccess(V
result) {
if (
setSuccess0(
result)) {
notifyListeners();
return true;
}
return false;
}
@
Override
public
Promise<V>
setFailure(
Throwable cause) {
if (
setFailure0(
cause)) {
notifyListeners();
return this;
}
throw new
IllegalStateException("complete already: " + this,
cause);
}
@
Override
public boolean
tryFailure(
Throwable cause) {
if (
setFailure0(
cause)) {
notifyListeners();
return true;
}
return false;
}
@
Override
public boolean
setUncancellable() {
if (
RESULT_UPDATER.
compareAndSet(this, null,
UNCANCELLABLE)) {
return true;
}
Object result = this.
result;
return !
isDone0(
result) || !
isCancelled0(
result);
}
@
Override
public boolean
isSuccess() {
Object result = this.
result;
return
result != null &&
result !=
UNCANCELLABLE && !(
result instanceof
CauseHolder);
}
@
Override
public boolean
isCancellable() {
return
result == null;
}
@
Override
public
Throwable cause() {
Object result = this.
result;
return (
result instanceof
CauseHolder) ? ((
CauseHolder)
result).
cause : null;
}
@
Override
public
Promise<V>
addListener(
GenericFutureListener<? extends
Future<? super V>>
listener) {
checkNotNull(
listener, "listener");
synchronized (this) {
addListener0(
listener);
}
if (
isDone()) {
notifyListeners();
}
return this;
}
@
Override
public
Promise<V>
addListeners(
GenericFutureListener<? extends
Future<? super V>>...
listeners) {
checkNotNull(
listeners, "listeners");
synchronized (this) {
for (
GenericFutureListener<? extends
Future<? super V>>
listener :
listeners) {
if (
listener == null) {
break;
}
addListener0(
listener);
}
}
if (
isDone()) {
notifyListeners();
}
return this;
}
@
Override
public
Promise<V>
removeListener(final
GenericFutureListener<? extends
Future<? super V>>
listener) {
checkNotNull(
listener, "listener");
synchronized (this) {
removeListener0(
listener);
}
return this;
}
@
Override
public
Promise<V>
removeListeners(final
GenericFutureListener<? extends
Future<? super V>>...
listeners) {
checkNotNull(
listeners, "listeners");
synchronized (this) {
for (
GenericFutureListener<? extends
Future<? super V>>
listener :
listeners) {
if (
listener == null) {
break;
}
removeListener0(
listener);
}
}
return this;
}
@
Override
public
Promise<V>
await() throws
InterruptedException {
if (
isDone()) {
return this;
}
if (
Thread.
interrupted()) {
throw new
InterruptedException(
toString());
}
checkDeadLock();
synchronized (this) {
while (!
isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
@
Override
public
Promise<V>
awaitUninterruptibly() {
if (
isDone()) {
return this;
}
checkDeadLock();
boolean
interrupted = false;
synchronized (this) {
while (!
isDone()) {
incWaiters();
try {
wait();
} catch (
InterruptedException e) {
// Interrupted while waiting.
interrupted = true;
} finally {
decWaiters();
}
}
}
if (
interrupted) {
Thread.
currentThread().
interrupt();
}
return this;
}
@
Override
public boolean
await(long
timeout,
TimeUnit unit) throws
InterruptedException {
return
await0(
unit.
toNanos(
timeout), true);
}
@
Override
public boolean
await(long
timeoutMillis) throws
InterruptedException {
return
await0(
MILLISECONDS.
toNanos(
timeoutMillis), true);
}
@
Override
public boolean
awaitUninterruptibly(long
timeout,
TimeUnit unit) {
try {
return
await0(
unit.
toNanos(
timeout), false);
} catch (
InterruptedException e) {
// Should not be raised at all.
throw new
InternalError();
}
}
@
Override
public boolean
awaitUninterruptibly(long
timeoutMillis) {
try {
return
await0(
MILLISECONDS.
toNanos(
timeoutMillis), false);
} catch (
InterruptedException e) {
// Should not be raised at all.
throw new
InternalError();
}
}
@
SuppressWarnings("unchecked")
@
Override
public V
getNow() {
Object result = this.
result;
if (
result instanceof
CauseHolder ||
result ==
SUCCESS ||
result ==
UNCANCELLABLE) {
return null;
}
return (V)
result;
}
/**
* {@inheritDoc}
*
* @param mayInterruptIfRunning this value has no effect in this implementation.
*/
@
Override
public boolean
cancel(boolean
mayInterruptIfRunning) {
if (
RESULT_UPDATER.
compareAndSet(this, null,
CANCELLATION_CAUSE_HOLDER)) {
checkNotifyWaiters();
notifyListeners();
return true;
}
return false;
}
@
Override
public boolean
isCancelled() {
return
isCancelled0(
result);
}
@
Override
public boolean
isDone() {
return
isDone0(
result);
}
@
Override
public
Promise<V>
sync() throws
InterruptedException {
await();
rethrowIfFailed();
return this;
}
@
Override
public
Promise<V>
syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
@
Override
public
String toString() {
return
toStringBuilder().
toString();
}
protected
StringBuilder toStringBuilder() {
StringBuilder buf = new
StringBuilder(64)
.
append(
StringUtil.
simpleClassName(this))
.
append('@')
.
append(
Integer.
toHexString(
hashCode()));
Object result = this.
result;
if (
result ==
SUCCESS) {
buf.
append("(success)");
} else if (
result ==
UNCANCELLABLE) {
buf.
append("(uncancellable)");
} else if (
result instanceof
CauseHolder) {
buf.
append("(failure: ")
.
append(((
CauseHolder)
result).
cause)
.
append(')');
} else if (
result != null) {
buf.
append("(success: ")
.
append(
result)
.
append(')');
} else {
buf.
append("(incomplete)");
}
return
buf;
}
/**
* Get the executor used to notify listeners when this promise is complete.
* <p>
* It is assumed this executor will protect against {@link StackOverflowError} exceptions.
* The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
* depth exceeds a threshold.
* @return The executor used to notify listeners when this promise is complete.
*/
protected
EventExecutor executor() {
return
executor;
}
protected void
checkDeadLock() {
EventExecutor e =
executor();
if (
e != null &&
e.
inEventLoop()) {
throw new
BlockingOperationException(
toString());
}
}
/**
* Notify a listener that a future has completed.
* <p>
* This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
* {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
* @param eventExecutor the executor to use to notify the listener {@code listener}.
* @param future the future that is complete.
* @param listener the listener to notify.
*/
protected static void
notifyListener(
EventExecutor eventExecutor, final
Future<?>
future, final
GenericFutureListener<?>
listener) {
checkNotNull(
eventExecutor, "eventExecutor");
checkNotNull(
future, "future");
checkNotNull(
listener, "listener");
notifyListenerWithStackOverFlowProtection(
eventExecutor,
future,
listener);
}
private void
notifyListeners() {
EventExecutor executor =
executor();
if (
executor.
inEventLoop()) {
final
InternalThreadLocalMap threadLocals =
InternalThreadLocalMap.
get();
final int
stackDepth =
threadLocals.
futureListenerStackDepth();
if (
stackDepth <
MAX_LISTENER_STACK_DEPTH) {
threadLocals.
setFutureListenerStackDepth(
stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.
setFutureListenerStackDepth(
stackDepth);
}
return;
}
}
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
notifyListenersNow();
}
});
}
/**
* The logic in this method should be identical to {@link #notifyListeners()} but
* cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
* listener(s) may be changed and is protected by a synchronized operation.
*/
private static void
notifyListenerWithStackOverFlowProtection(final
EventExecutor executor,
final
Future<?>
future,
final
GenericFutureListener<?>
listener) {
if (
executor.
inEventLoop()) {
final
InternalThreadLocalMap threadLocals =
InternalThreadLocalMap.
get();
final int
stackDepth =
threadLocals.
futureListenerStackDepth();
if (
stackDepth <
MAX_LISTENER_STACK_DEPTH) {
threadLocals.
setFutureListenerStackDepth(
stackDepth + 1);
try {
notifyListener0(
future,
listener);
} finally {
threadLocals.
setFutureListenerStackDepth(
stackDepth);
}
return;
}
}
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
notifyListener0(
future,
listener);
}
});
}
private void
notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (
notifyingListeners || this.
listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.
listeners;
this.
listeners = null;
}
for (;;) {
if (
listeners instanceof
DefaultFutureListeners) {
notifyListeners0((
DefaultFutureListeners)
listeners);
} else {
notifyListener0(this, (
GenericFutureListener<?>)
listeners);
}
synchronized (this) {
if (this.
listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.
listeners;
this.
listeners = null;
}
}
}
private void
notifyListeners0(
DefaultFutureListeners listeners) {
GenericFutureListener<?>[]
a =
listeners.
listeners();
int
size =
listeners.
size();
for (int
i = 0;
i <
size;
i ++) {
notifyListener0(this,
a[
i]);
}
}
@
SuppressWarnings({ "unchecked", "rawtypes" })
private static void
notifyListener0(
Future future,
GenericFutureListener l) {
try {
l.
operationComplete(
future);
} catch (
Throwable t) {
if (
logger.
isWarnEnabled()) {
logger.
warn("An exception was thrown by " +
l.
getClass().
getName() + ".operationComplete()",
t);
}
}
}
private void
addListener0(
GenericFutureListener<? extends
Future<? super V>>
listener) {
if (
listeners == null) {
listeners =
listener;
} else if (
listeners instanceof
DefaultFutureListeners) {
((
DefaultFutureListeners)
listeners).
add(
listener);
} else {
listeners = new
DefaultFutureListeners((
GenericFutureListener<?>)
listeners,
listener);
}
}
private void
removeListener0(
GenericFutureListener<? extends
Future<? super V>>
listener) {
if (
listeners instanceof
DefaultFutureListeners) {
((
DefaultFutureListeners)
listeners).
remove(
listener);
} else if (
listeners ==
listener) {
listeners = null;
}
}
private boolean
setSuccess0(V
result) {
return
setValue0(
result == null ?
SUCCESS :
result);
}
private boolean
setFailure0(
Throwable cause) {
return
setValue0(new
CauseHolder(
checkNotNull(
cause, "cause")));
}
private boolean
setValue0(
Object objResult) {
if (
RESULT_UPDATER.
compareAndSet(this, null,
objResult) ||
RESULT_UPDATER.
compareAndSet(this,
UNCANCELLABLE,
objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}
private synchronized void
checkNotifyWaiters() {
if (
waiters > 0) {
notifyAll();
}
}
private void
incWaiters() {
if (
waiters ==
Short.
MAX_VALUE) {
throw new
IllegalStateException("too many waiters: " + this);
}
++
waiters;
}
private void
decWaiters() {
--
waiters;
}
private void
rethrowIfFailed() {
Throwable cause =
cause();
if (
cause == null) {
return;
}
PlatformDependent.
throwException(
cause);
}
private boolean
await0(long
timeoutNanos, boolean
interruptable) throws
InterruptedException {
if (
isDone()) {
return true;
}
if (
timeoutNanos <= 0) {
return
isDone();
}
if (
interruptable &&
Thread.
interrupted()) {
throw new
InterruptedException(
toString());
}
checkDeadLock();
long
startTime =
System.
nanoTime();
long
waitTime =
timeoutNanos;
boolean
interrupted = false;
try {
for (;;) {
synchronized (this) {
if (
isDone()) {
return true;
}
incWaiters();
try {
wait(
waitTime / 1000000, (int) (
waitTime % 1000000));
} catch (
InterruptedException e) {
if (
interruptable) {
throw
e;
} else {
interrupted = true;
}
} finally {
decWaiters();
}
}
if (
isDone()) {
return true;
} else {
waitTime =
timeoutNanos - (
System.
nanoTime() -
startTime);
if (
waitTime <= 0) {
return
isDone();
}
}
}
} finally {
if (
interrupted) {
Thread.
currentThread().
interrupt();
}
}
}
/**
* Notify all progressive listeners.
* <p>
* No attempt is made to ensure notification order if multiple calls are made to this method before
* the original invocation completes.
* <p>
* This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
* @param progress the new progress.
* @param total the total progress.
*/
@
SuppressWarnings("unchecked")
void
notifyProgressiveListeners(final long
progress, final long
total) {
final
Object listeners =
progressiveListeners();
if (
listeners == null) {
return;
}
final
ProgressiveFuture<V>
self = (
ProgressiveFuture<V>) this;
EventExecutor executor =
executor();
if (
executor.
inEventLoop()) {
if (
listeners instanceof
GenericProgressiveFutureListener[]) {
notifyProgressiveListeners0(
self, (
GenericProgressiveFutureListener<?>[])
listeners,
progress,
total);
} else {
notifyProgressiveListener0(
self, (
GenericProgressiveFutureListener<
ProgressiveFuture<V>>)
listeners,
progress,
total);
}
} else {
if (
listeners instanceof
GenericProgressiveFutureListener[]) {
final
GenericProgressiveFutureListener<?>[]
array =
(
GenericProgressiveFutureListener<?>[])
listeners;
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
notifyProgressiveListeners0(
self,
array,
progress,
total);
}
});
} else {
final
GenericProgressiveFutureListener<
ProgressiveFuture<V>>
l =
(
GenericProgressiveFutureListener<
ProgressiveFuture<V>>)
listeners;
safeExecute(
executor, new
Runnable() {
@
Override
public void
run() {
notifyProgressiveListener0(
self,
l,
progress,
total);
}
});
}
}
}
/**
* Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
* {@code null}.
*/
private synchronized
Object progressiveListeners() {
Object listeners = this.
listeners;
if (
listeners == null) {
// No listeners added
return null;
}
if (
listeners instanceof
DefaultFutureListeners) {
// Copy DefaultFutureListeners into an array of listeners.
DefaultFutureListeners dfl = (
DefaultFutureListeners)
listeners;
int
progressiveSize =
dfl.
progressiveSize();
switch (
progressiveSize) {
case 0:
return null;
case 1:
for (
GenericFutureListener<?>
l:
dfl.
listeners()) {
if (
l instanceof
GenericProgressiveFutureListener) {
return
l;
}
}
return null;
}
GenericFutureListener<?>[]
array =
dfl.
listeners();
GenericProgressiveFutureListener<?>[]
copy = new
GenericProgressiveFutureListener[
progressiveSize];
for (int
i = 0,
j = 0;
j <
progressiveSize;
i ++) {
GenericFutureListener<?>
l =
array[
i];
if (
l instanceof
GenericProgressiveFutureListener) {
copy[
j ++] = (
GenericProgressiveFutureListener<?>)
l;
}
}
return
copy;
} else if (
listeners instanceof
GenericProgressiveFutureListener) {
return
listeners;
} else {
// Only one listener was added and it's not a progressive listener.
return null;
}
}
private static void
notifyProgressiveListeners0(
ProgressiveFuture<?>
future,
GenericProgressiveFutureListener<?>[]
listeners, long
progress, long
total) {
for (
GenericProgressiveFutureListener<?>
l:
listeners) {
if (
l == null) {
break;
}
notifyProgressiveListener0(
future,
l,
progress,
total);
}
}
@
SuppressWarnings({ "unchecked", "rawtypes" })
private static void
notifyProgressiveListener0(
ProgressiveFuture future,
GenericProgressiveFutureListener l, long
progress, long
total) {
try {
l.
operationProgressed(
future,
progress,
total);
} catch (
Throwable t) {
if (
logger.
isWarnEnabled()) {
logger.
warn("An exception was thrown by " +
l.
getClass().
getName() + ".operationProgressed()",
t);
}
}
}
private static boolean
isCancelled0(
Object result) {
return
result instanceof
CauseHolder && ((
CauseHolder)
result).
cause instanceof
CancellationException;
}
private static boolean
isDone0(
Object result) {
return
result != null &&
result !=
UNCANCELLABLE;
}
private static final class
CauseHolder {
final
Throwable cause;
CauseHolder(
Throwable cause) {
this.
cause =
cause;
}
}
private static void
safeExecute(
EventExecutor executor,
Runnable task) {
try {
executor.
execute(
task);
} catch (
Throwable t) {
rejectedExecutionLogger.
error("Failed to submit a listener notification task. Event loop shut down?",
t);
}
}
}