/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util;
import java.io.
IOException;
import java.io.
ObjectInputStream;
import java.io.
Serializable;
import org.apache.commons.logging.
Log;
import org.apache.commons.logging.
LogFactory;
/**
* Support class for throttling concurrent access to a specific resource.
*
* <p>Designed for use as a base class, with the subclass invoking
* the {@link #beforeAccess()} and {@link #afterAccess()} methods at
* appropriate points of its workflow. Note that {@code afterAccess}
* should usually be called in a finally block!
*
* <p>The default concurrency limit of this support class is -1
* ("unbounded concurrency"). Subclasses may override this default;
* check the javadoc of the concrete class that you're using.
*
* @author Juergen Hoeller
* @since 1.2.5
* @see #setConcurrencyLimit
* @see #beforeAccess()
* @see #afterAccess()
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
* @see java.io.Serializable
*/
@
SuppressWarnings("serial")
public abstract class
ConcurrencyThrottleSupport implements
Serializable {
/**
* Permit any number of concurrent invocations: that is, don't throttle concurrency.
*/
public static final int
UNBOUNDED_CONCURRENCY = -1;
/**
* Switch concurrency 'off': that is, don't allow any concurrent invocations.
*/
public static final int
NO_CONCURRENCY = 0;
/** Transient to optimize serialization */
protected transient
Log logger =
LogFactory.
getLog(
getClass());
private transient
Object monitor = new
Object();
private int
concurrencyLimit =
UNBOUNDED_CONCURRENCY;
private int
concurrencyCount = 0;
/**
* Set the maximum number of concurrent access attempts allowed.
* -1 indicates unbounded concurrency.
* <p>In principle, this limit can be changed at runtime,
* although it is generally designed as a config time setting.
* <p>NOTE: Do not switch between -1 and any concrete limit at runtime,
* as this will lead to inconsistent concurrency counts: A limit
* of -1 effectively turns off concurrency counting completely.
*/
public void
setConcurrencyLimit(int
concurrencyLimit) {
this.
concurrencyLimit =
concurrencyLimit;
}
/**
* Return the maximum number of concurrent access attempts allowed.
*/
public int
getConcurrencyLimit() {
return this.
concurrencyLimit;
}
/**
* Return whether this throttle is currently active.
* @return {@code true} if the concurrency limit for this instance is active
* @see #getConcurrencyLimit()
*/
public boolean
isThrottleActive() {
return (this.
concurrencyLimit >= 0);
}
/**
* To be invoked before the main execution logic of concrete subclasses.
* <p>This implementation applies the concurrency throttle.
* @see #afterAccess()
*/
protected void
beforeAccess() {
if (this.
concurrencyLimit ==
NO_CONCURRENCY) {
throw new
IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.
concurrencyLimit > 0) {
boolean
debug =
logger.
isDebugEnabled();
synchronized (this.
monitor) {
boolean
interrupted = false;
while (this.
concurrencyCount >= this.
concurrencyLimit) {
if (
interrupted) {
throw new
IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (
debug) {
logger.
debug("Concurrency count " + this.
concurrencyCount +
" has reached limit " + this.
concurrencyLimit + " - blocking");
}
try {
this.
monitor.
wait();
}
catch (
InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.
currentThread().
interrupt();
interrupted = true;
}
}
if (
debug) {
logger.
debug("Entering throttle at concurrency count " + this.
concurrencyCount);
}
this.
concurrencyCount++;
}
}
}
/**
* To be invoked after the main execution logic of concrete subclasses.
* @see #beforeAccess()
*/
protected void
afterAccess() {
if (this.
concurrencyLimit >= 0) {
synchronized (this.
monitor) {
this.
concurrencyCount--;
if (
logger.
isDebugEnabled()) {
logger.
debug("Returning from throttle at concurrency count " + this.
concurrencyCount);
}
this.
monitor.
notify();
}
}
}
//---------------------------------------------------------------------
// Serialization support
//---------------------------------------------------------------------
private void
readObject(
ObjectInputStream ois) throws
IOException,
ClassNotFoundException {
// Rely on default serialization, just initialize state after deserialization.
ois.
defaultReadObject();
// Initialize transient fields.
this.
logger =
LogFactory.
getLog(
getClass());
this.
monitor = new
Object();
}
}