/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.
ObjectUtil;
import io.netty.util.internal.
PlatformDependent;
import io.netty.util.internal.
UnstableApi;
import java.util.
Collection;
import java.util.
Iterator;
import java.util.
List;
import java.util.
Queue;
import java.util.concurrent.
Callable;
import java.util.concurrent.
ExecutionException;
import java.util.concurrent.
RejectedExecutionException;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.
TimeoutException;
import java.util.concurrent.atomic.
AtomicInteger;
/**
* {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what
* {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s.
*
* <p>The {@link EventExecutorGroup#next()} for the wrapped {@link EventExecutorGroup} must <strong>NOT</strong> return
* executors of type {@link OrderedEventExecutor}.
*/
@
UnstableApi
public final class
NonStickyEventExecutorGroup implements
EventExecutorGroup {
private final
EventExecutorGroup group;
private final int
maxTaskExecutePerRun;
/**
* Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
* any {@link OrderedEventExecutor}s.
*/
public
NonStickyEventExecutorGroup(
EventExecutorGroup group) {
this(
group, 1024);
}
/**
* Creates a new instance. Be aware that the given {@link EventExecutorGroup} <strong>MUST NOT</strong> contain
* any {@link OrderedEventExecutor}s.
*/
public
NonStickyEventExecutorGroup(
EventExecutorGroup group, int
maxTaskExecutePerRun) {
this.
group =
verify(
group);
this.
maxTaskExecutePerRun =
ObjectUtil.
checkPositive(
maxTaskExecutePerRun, "maxTaskExecutePerRun");
}
private static
EventExecutorGroup verify(
EventExecutorGroup group) {
Iterator<
EventExecutor>
executors =
ObjectUtil.
checkNotNull(
group, "group").
iterator();
while (
executors.
hasNext()) {
EventExecutor executor =
executors.
next();
if (
executor instanceof
OrderedEventExecutor) {
throw new
IllegalArgumentException("EventExecutorGroup " +
group
+ " contains OrderedEventExecutors: " +
executor);
}
}
return
group;
}
private
NonStickyOrderedEventExecutor newExecutor(
EventExecutor executor) {
return new
NonStickyOrderedEventExecutor(
executor,
maxTaskExecutePerRun);
}
@
Override
public boolean
isShuttingDown() {
return
group.
isShuttingDown();
}
@
Override
public
Future<?>
shutdownGracefully() {
return
group.
shutdownGracefully();
}
@
Override
public
Future<?>
shutdownGracefully(long
quietPeriod, long
timeout,
TimeUnit unit) {
return
group.
shutdownGracefully(
quietPeriod,
timeout,
unit);
}
@
Override
public
Future<?>
terminationFuture() {
return
group.
terminationFuture();
}
@
SuppressWarnings("deprecation")
@
Override
public void
shutdown() {
group.
shutdown();
}
@
SuppressWarnings("deprecation")
@
Override
public
List<
Runnable>
shutdownNow() {
return
group.
shutdownNow();
}
@
Override
public
EventExecutor next() {
return
newExecutor(
group.
next());
}
@
Override
public
Iterator<
EventExecutor>
iterator() {
final
Iterator<
EventExecutor>
itr =
group.
iterator();
return new
Iterator<
EventExecutor>() {
@
Override
public boolean
hasNext() {
return
itr.
hasNext();
}
@
Override
public
EventExecutor next() {
return
newExecutor(
itr.
next());
}
@
Override
public void
remove() {
itr.
remove();
}
};
}
@
Override
public
Future<?>
submit(
Runnable task) {
return
group.
submit(
task);
}
@
Override
public <T>
Future<T>
submit(
Runnable task, T
result) {
return
group.
submit(
task,
result);
}
@
Override
public <T>
Future<T>
submit(
Callable<T>
task) {
return
group.
submit(
task);
}
@
Override
public
ScheduledFuture<?>
schedule(
Runnable command, long
delay,
TimeUnit unit) {
return
group.
schedule(
command,
delay,
unit);
}
@
Override
public <V>
ScheduledFuture<V>
schedule(
Callable<V>
callable, long
delay,
TimeUnit unit) {
return
group.
schedule(
callable,
delay,
unit);
}
@
Override
public
ScheduledFuture<?>
scheduleAtFixedRate(
Runnable command, long
initialDelay, long
period,
TimeUnit unit) {
return
group.
scheduleAtFixedRate(
command,
initialDelay,
period,
unit);
}
@
Override
public
ScheduledFuture<?>
scheduleWithFixedDelay(
Runnable command, long
initialDelay, long
delay,
TimeUnit unit) {
return
group.
scheduleWithFixedDelay(
command,
initialDelay,
delay,
unit);
}
@
Override
public boolean
isShutdown() {
return
group.
isShutdown();
}
@
Override
public boolean
isTerminated() {
return
group.
isTerminated();
}
@
Override
public boolean
awaitTermination(long
timeout,
TimeUnit unit) throws
InterruptedException {
return
group.
awaitTermination(
timeout,
unit);
}
@
Override
public <T>
List<java.util.concurrent.
Future<T>>
invokeAll(
Collection<? extends
Callable<T>>
tasks) throws
InterruptedException {
return
group.
invokeAll(
tasks);
}
@
Override
public <T>
List<java.util.concurrent.
Future<T>>
invokeAll(
Collection<? extends
Callable<T>>
tasks, long
timeout,
TimeUnit unit) throws
InterruptedException {
return
group.
invokeAll(
tasks,
timeout,
unit);
}
@
Override
public <T> T
invokeAny(
Collection<? extends
Callable<T>>
tasks) throws
InterruptedException,
ExecutionException {
return
group.
invokeAny(
tasks);
}
@
Override
public <T> T
invokeAny(
Collection<? extends
Callable<T>>
tasks, long
timeout,
TimeUnit unit)
throws
InterruptedException,
ExecutionException,
TimeoutException {
return
group.
invokeAny(
tasks,
timeout,
unit);
}
@
Override
public void
execute(
Runnable command) {
group.
execute(
command);
}
private static final class
NonStickyOrderedEventExecutor extends
AbstractEventExecutor
implements
Runnable,
OrderedEventExecutor {
private final
EventExecutor executor;
private final
Queue<
Runnable>
tasks =
PlatformDependent.
newMpscQueue();
private static final int
NONE = 0;
private static final int
SUBMITTED = 1;
private static final int
RUNNING = 2;
private final
AtomicInteger state = new
AtomicInteger();
private final int
maxTaskExecutePerRun;
NonStickyOrderedEventExecutor(
EventExecutor executor, int
maxTaskExecutePerRun) {
super(
executor);
this.
executor =
executor;
this.
maxTaskExecutePerRun =
maxTaskExecutePerRun;
}
@
Override
public void
run() {
if (!
state.
compareAndSet(
SUBMITTED,
RUNNING)) {
return;
}
for (;;) {
int
i = 0;
try {
for (;
i <
maxTaskExecutePerRun;
i++) {
Runnable task =
tasks.
poll();
if (
task == null) {
break;
}
safeExecute(
task);
}
} finally {
if (
i ==
maxTaskExecutePerRun) {
try {
state.
set(
SUBMITTED);
executor.
execute(this);
return; // done
} catch (
Throwable ignore) {
// Reset the state back to running as we will keep on executing tasks.
state.
set(
RUNNING);
// if an error happened we should just ignore it and let the loop run again as there is not
// much else we can do. Most likely this was triggered by a full task queue. In this case
// we just will run more tasks and try again later.
}
} else {
state.
set(
NONE);
return; // done
}
}
}
}
@
Override
public boolean
inEventLoop(
Thread thread) {
return false;
}
@
Override
public boolean
inEventLoop() {
return false;
}
@
Override
public boolean
isShuttingDown() {
return
executor.
isShutdown();
}
@
Override
public
Future<?>
shutdownGracefully(long
quietPeriod, long
timeout,
TimeUnit unit) {
return
executor.
shutdownGracefully(
quietPeriod,
timeout,
unit);
}
@
Override
public
Future<?>
terminationFuture() {
return
executor.
terminationFuture();
}
@
Override
public void
shutdown() {
executor.
shutdown();
}
@
Override
public boolean
isShutdown() {
return
executor.
isShutdown();
}
@
Override
public boolean
isTerminated() {
return
executor.
isTerminated();
}
@
Override
public boolean
awaitTermination(long
timeout,
TimeUnit unit) throws
InterruptedException {
return
executor.
awaitTermination(
timeout,
unit);
}
@
Override
public void
execute(
Runnable command) {
if (!
tasks.
offer(
command)) {
throw new
RejectedExecutionException();
}
if (
state.
compareAndSet(
NONE,
SUBMITTED)) {
// Actually it could happen that the runnable was picked up in between but we not care to much and just
// execute ourself. At worst this will be a NOOP when run() is called.
try {
executor.
execute(this);
} catch (
Throwable e) {
// Not reset the state as some other Runnable may be added to the queue already in the meantime.
tasks.
remove(
command);
PlatformDependent.
throwException(
e);
}
}
}
}
}