/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;
import java.util.*;
import java.util.concurrent.atomic.
AtomicReference;
/**
* Registry for plugin implementations that allows global override and handles the retrieval of correct
* implementation based on order of precedence:
* <ol>
* <li>plugin registered globally via {@code register} methods in this class</li>
* <li>plugin registered and retrieved using {@link java.lang.System#getProperty(String)} (see get methods for
* property names)</li>
* <li>default implementation</li>
* </ol>
* <p>In addition to the {@code rxjava.plugin.[simple class name].implementation} system properties,
* you can define two system property:<br>
* <pre><code>
* rxjava.plugin.[index].class}
* rxjava.plugin.[index].impl}
* </code></pre>
*
* Where the {@code .class} property contains the simple class name from above and the {@code .impl}
* contains the fully qualified name of the implementation class. The {@code [index]} can be
* any short string or number of your choosing. For example, you can now define a custom
* {@code RxJavaErrorHandler} via two system property:
* <pre><code>
* rxjava.plugin.1.class=RxJavaErrorHandler
* rxjava.plugin.1.impl=some.package.MyRxJavaErrorHandler
* </code></pre>
*
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">RxJava Wiki: Plugins</a>
*
* Use the {@link RxJavaHooks} features instead which let's you change individual
* handlers at runtime.
*/
public class
RxJavaPlugins {
private final static
RxJavaPlugins INSTANCE = new
RxJavaPlugins();
private final
AtomicReference<
RxJavaErrorHandler>
errorHandler = new
AtomicReference<
RxJavaErrorHandler>();
private final
AtomicReference<
RxJavaObservableExecutionHook>
observableExecutionHook = new
AtomicReference<
RxJavaObservableExecutionHook>();
private final
AtomicReference<
RxJavaSingleExecutionHook>
singleExecutionHook = new
AtomicReference<
RxJavaSingleExecutionHook>();
private final
AtomicReference<
RxJavaCompletableExecutionHook>
completableExecutionHook = new
AtomicReference<
RxJavaCompletableExecutionHook>();
private final
AtomicReference<
RxJavaSchedulersHook>
schedulersHook = new
AtomicReference<
RxJavaSchedulersHook>();
static final
RxJavaErrorHandler DEFAULT_ERROR_HANDLER = new
RxJavaErrorHandler() {
};
/**
* Retrieves the single {@code RxJavaPlugins} instance.
*
* @return the single {@code RxJavaPlugins} instance
*
* @deprecated use the static methods of {@link RxJavaHooks}.
*/
@
Deprecated
public static
RxJavaPlugins getInstance() {
return
INSTANCE;
}
/* package accessible for unit tests */
RxJavaPlugins() {
// nothing to do
}
/**
* Reset {@code RxJavaPlugins} instance
* <p>
* This API is experimental. Resetting the plugins is dangerous
* during application runtime and also bad code could invoke it in
* the middle of an application life-cycle and really break applications
* if not used cautiously. For more detailed discussions:
* @see <a href="https://github.com/ReactiveX/RxJava/issues/2297">Make RxJavaPlugins.reset() public</a>
* @since 1.3
*/
public void
reset() {
INSTANCE.
errorHandler.
set(null);
INSTANCE.
observableExecutionHook.
set(null);
INSTANCE.
singleExecutionHook.
set(null);
INSTANCE.
completableExecutionHook.
set(null);
INSTANCE.
schedulersHook.
set(null);
}
/**
* Retrieves an instance of {@link RxJavaErrorHandler} to use based on order of precedence as defined in
* {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerErrorHandler(RxJavaErrorHandler)} or by setting the
* property {@code rxjava.plugin.RxJavaErrorHandler.implementation} with the full class name to load.
* @return {@link RxJavaErrorHandler} implementation to use
*/
public
RxJavaErrorHandler getErrorHandler() {
if (
errorHandler.
get() == null) {
// check for an implementation from System.getProperty first
Object impl =
getPluginImplementationViaProperty(
RxJavaErrorHandler.class,
getSystemPropertiesSafe());
if (
impl == null) {
// nothing set via properties so initialize with default
errorHandler.
compareAndSet(null,
DEFAULT_ERROR_HANDLER);
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
errorHandler.
compareAndSet(null, (
RxJavaErrorHandler)
impl);
}
}
return
errorHandler.
get();
}
/**
* Registers an {@link RxJavaErrorHandler} implementation as a global override of any injected or default
* implementations.
*
* @param impl
* {@link RxJavaErrorHandler} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void
registerErrorHandler(
RxJavaErrorHandler impl) {
if (!
errorHandler.
compareAndSet(null,
impl)) {
throw new
IllegalStateException("Another strategy was already registered: " +
errorHandler.
get());
}
}
/**
* Retrieves the instance of {@link RxJavaObservableExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerObservableExecutionHook(RxJavaObservableExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaObservableExecutionHook.implementation} with the
* full class name to load.
*
* @return {@link RxJavaObservableExecutionHook} implementation to use
*/
public
RxJavaObservableExecutionHook getObservableExecutionHook() {
if (
observableExecutionHook.
get() == null) {
// check for an implementation from System.getProperty first
Object impl =
getPluginImplementationViaProperty(
RxJavaObservableExecutionHook.class,
getSystemPropertiesSafe());
if (
impl == null) {
// nothing set via properties so initialize with default
observableExecutionHook.
compareAndSet(null,
RxJavaObservableExecutionHookDefault.
getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
observableExecutionHook.
compareAndSet(null, (
RxJavaObservableExecutionHook)
impl);
}
}
return
observableExecutionHook.
get();
}
/**
* Register an {@link RxJavaObservableExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaObservableExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void
registerObservableExecutionHook(
RxJavaObservableExecutionHook impl) {
if (!
observableExecutionHook.
compareAndSet(null,
impl)) {
throw new
IllegalStateException("Another strategy was already registered: " +
observableExecutionHook.
get());
}
}
/**
* Retrieves the instance of {@link RxJavaSingleExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerSingleExecutionHook(RxJavaSingleExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaSingleExecutionHook.implementation} with the
* full class name to load.
*
* @return {@link RxJavaSingleExecutionHook} implementation to use
*/
public
RxJavaSingleExecutionHook getSingleExecutionHook() {
if (
singleExecutionHook.
get() == null) {
// check for an implementation from System.getProperty first
Object impl =
getPluginImplementationViaProperty(
RxJavaSingleExecutionHook.class,
getSystemPropertiesSafe());
if (
impl == null) {
// nothing set via properties so initialize with default
singleExecutionHook.
compareAndSet(null,
RxJavaSingleExecutionHookDefault.
getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
singleExecutionHook.
compareAndSet(null, (
RxJavaSingleExecutionHook)
impl);
}
}
return
singleExecutionHook.
get();
}
/**
* Register an {@link RxJavaSingleExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaSingleExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void
registerSingleExecutionHook(
RxJavaSingleExecutionHook impl) {
if (!
singleExecutionHook.
compareAndSet(null,
impl)) {
throw new
IllegalStateException("Another strategy was already registered: " +
singleExecutionHook.
get());
}
}
/**
* Retrieves the instance of {@link RxJavaCompletableExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerCompletableExecutionHook(RxJavaCompletableExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaCompletableExecutionHook.implementation} with the
* full class name to load.
*
* @return {@link RxJavaCompletableExecutionHook} implementation to use
* @since 1.3
*/
public
RxJavaCompletableExecutionHook getCompletableExecutionHook() {
if (
completableExecutionHook.
get() == null) {
// check for an implementation from System.getProperty first
Object impl =
getPluginImplementationViaProperty(
RxJavaCompletableExecutionHook.class,
getSystemPropertiesSafe());
if (
impl == null) {
// nothing set via properties so initialize with default
completableExecutionHook.
compareAndSet(null, new
RxJavaCompletableExecutionHook() { });
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
completableExecutionHook.
compareAndSet(null, (
RxJavaCompletableExecutionHook)
impl);
}
}
return
completableExecutionHook.
get();
}
/**
* Register an {@link RxJavaCompletableExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaCompletableExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
* @since 1.3
*/
public void
registerCompletableExecutionHook(
RxJavaCompletableExecutionHook impl) {
if (!
completableExecutionHook.
compareAndSet(null,
impl)) {
throw new
IllegalStateException("Another strategy was already registered: " +
singleExecutionHook.
get());
}
}
/**
* A security manager may prevent accessing the System properties entirely,
* therefore, the SecurityException is turned into an empty properties.
* @return the Properties to use for looking up settings
*/
/* test */ static
Properties getSystemPropertiesSafe() {
try {
return
System.
getProperties();
} catch (
SecurityException ex) {
return new
Properties();
}
}
/* test */ static
Object getPluginImplementationViaProperty(
Class<?>
pluginClass,
Properties propsIn) {
// Make a defensive clone because traversal may fail with ConcurrentModificationException
// if the properties get changed by something outside RxJava.
Properties props = (
Properties)
propsIn.
clone();
final
String classSimpleName =
pluginClass.
getSimpleName();
/*
* Check system properties for plugin class.
* <p>
* This will only happen during system startup thus it's okay to use the synchronized
* System.getProperties as it will never get called in normal operations.
*/
String pluginPrefix = "rxjava.plugin.";
String defaultKey =
pluginPrefix +
classSimpleName + ".implementation";
String implementingClass =
props.
getProperty(
defaultKey);
if (
implementingClass == null) {
String classSuffix = ".class";
String implSuffix = ".impl";
try {
for (
Map.
Entry<
Object,
Object>
e :
props.
entrySet()) {
String key =
e.
getKey().
toString();
if (
key.
startsWith(
pluginPrefix) &&
key.
endsWith(
classSuffix)) {
String value =
e.
getValue().
toString();
if (
classSimpleName.
equals(
value)) {
String index =
key.
substring(0,
key.
length() -
classSuffix.
length()).
substring(
pluginPrefix.
length());
String implKey =
pluginPrefix +
index +
implSuffix;
implementingClass =
props.
getProperty(
implKey);
if (
implementingClass == null) {
throw new
IllegalStateException("Implementing class declaration for " +
classSimpleName + " missing: " +
implKey);
}
break;
}
}
}
} catch (
SecurityException ex) {
// https://github.com/ReactiveX/RxJava/issues/5819
// We don't seem to have access to all properties.
// At least print the exception to the console.
ex.
printStackTrace();
}
}
if (
implementingClass != null) {
try {
Class<?>
cls =
Class.
forName(
implementingClass);
// narrow the scope (cast) to the type we're expecting
cls =
cls.
asSubclass(
pluginClass);
return
cls.
newInstance();
} catch (
ClassCastException e) {
throw new
IllegalStateException(
classSimpleName + " implementation is not an instance of " +
classSimpleName + ": " +
implementingClass,
e);
} catch (
ClassNotFoundException e) {
throw new
IllegalStateException(
classSimpleName + " implementation class not found: " +
implementingClass,
e);
} catch (
InstantiationException e) {
throw new
IllegalStateException(
classSimpleName + " implementation not able to be instantiated: " +
implementingClass,
e);
} catch (
IllegalAccessException e) {
throw new
IllegalStateException(
classSimpleName + " implementation not able to be accessed: " +
implementingClass,
e);
}
}
return null;
}
/**
* Retrieves the instance of {@link RxJavaSchedulersHook} to use based on order of precedence as defined
* in the {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerSchedulersHook(RxJavaSchedulersHook)} or by setting
* the property {@code rxjava.plugin.RxJavaSchedulersHook.implementation} with the full class name to
* load.
*
* @return the {@link RxJavaSchedulersHook} implementation in use
*/
public
RxJavaSchedulersHook getSchedulersHook() {
if (
schedulersHook.
get() == null) {
// check for an implementation from System.getProperty first
Object impl =
getPluginImplementationViaProperty(
RxJavaSchedulersHook.class,
getSystemPropertiesSafe());
if (
impl == null) {
// nothing set via properties so initialize with default
schedulersHook.
compareAndSet(null,
RxJavaSchedulersHook.
getDefaultInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
schedulersHook.
compareAndSet(null, (
RxJavaSchedulersHook)
impl);
}
}
return
schedulersHook.
get();
}
/**
* Registers an {@link RxJavaSchedulersHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaSchedulersHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void
registerSchedulersHook(
RxJavaSchedulersHook impl) {
if (!
schedulersHook.
compareAndSet(null,
impl)) {
throw new
IllegalStateException("Another strategy was already registered: " +
schedulersHook.
get());
}
}
}