/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.tcp;
import java.net.
SocketAddress;
import java.time.
Duration;
import java.util.
Objects;
import java.util.function.
BiFunction;
import java.util.function.
Consumer;
import java.util.function.
Function;
import java.util.function.
Supplier;
import javax.annotation.
Nullable;
import io.netty.bootstrap.
ServerBootstrap;
import io.netty.buffer.
PooledByteBufAllocator;
import io.netty.channel.
Channel;
import io.netty.channel.
ChannelOption;
import io.netty.channel.
EventLoopGroup;
import io.netty.handler.logging.
LogLevel;
import io.netty.handler.logging.
LoggingHandler;
import io.netty.handler.ssl.
SslContext;
import io.netty.handler.ssl.util.
SelfSignedCertificate;
import io.netty.util.
AttributeKey;
import io.netty.util.
NetUtil;
import org.reactivestreams.
Publisher;
import reactor.core.
Exceptions;
import reactor.core.publisher.
Mono;
import reactor.netty.
Connection;
import reactor.netty.
ConnectionObserver;
import reactor.netty.
DisposableServer;
import reactor.netty.
NettyInbound;
import reactor.netty.
NettyOutbound;
import reactor.netty.channel.
BootstrapHandlers;
import reactor.netty.resources.
LoopResources;
import reactor.util.
Logger;
import reactor.util.
Loggers;
/**
* A TcpServer allows to build in a safe immutable way a TCP server that is materialized
* and connecting when {@link #bind(ServerBootstrap)} is ultimately called.
* <p>
* <p> Internally, materialization happens in two phases, first {@link #configure()} is
* called to retrieve a ready to use {@link ServerBootstrap} then {@link #bind(ServerBootstrap)}
* is called.
* <p>
* <p> Example:
* <pre>
* {@code
* TcpServer.create()
* .doOnBind(startMetrics)
* .doOnBound(startedMetrics)
* .doOnUnbound(stopMetrics)
* .host("127.0.0.1")
* .port(1234)
* .bind()
* .block()
* }
*
* @author Stephane Maldini
*/
public abstract class
TcpServer {
/**
* Prepare a {@link TcpServer}
*
* @return a {@link TcpServer}
*/
public static
TcpServer create() {
return
TcpServerBind.
INSTANCE;
}
/**
* The address to which this server should bind on subscribe.
*
* @param bindingAddressSupplier A supplier of the address to bind to.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer addressSupplier(
Supplier<? extends
SocketAddress>
bindingAddressSupplier) {
Objects.
requireNonNull(
bindingAddressSupplier, "bindingAddressSupplier");
return
bootstrap(
b ->
b.
localAddress(
bindingAddressSupplier.
get()));
}
/**
* Inject default attribute to the future child {@link Channel} connections. They
* will be available via {@link Channel#attr(AttributeKey)}.
* If the {@code value} is {@code null}, the attribute of the specified {@code key}
* is removed.
*
* @param key the attribute key
* @param value the attribute value
* @param <T> the attribute type
*
* @return a new {@link TcpServer}
*
* @see ServerBootstrap#childAttr(AttributeKey, Object)
*/
public final <T>
TcpServer attr(
AttributeKey<T>
key, @
Nullable T
value) {
Objects.
requireNonNull(
key, "key");
return
bootstrap(
b ->
b.
childAttr(
key,
value));
}
/**
* Apply {@link ServerBootstrap} configuration given mapper taking currently configured one
* and returning a new one to be ultimately used for socket binding. <p> Configuration
* will apply during {@link #configure()} phase.
*
* @param bootstrapMapper A bootstrap mapping function to update configuration and return an
* enriched bootstrap.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer bootstrap(
Function<? super
ServerBootstrap, ? extends
ServerBootstrap>
bootstrapMapper) {
return new
TcpServerBootstrap(this,
bootstrapMapper);
}
/**
* Bind the {@link TcpServer} and return a {@link Mono} of {@link DisposableServer}. If
* {@link Mono} is cancelled, the underlying binding will be aborted. Once the {@link
* DisposableServer} has been emitted and is not necessary anymore, disposing main server
* loop must be done by the user via {@link DisposableServer#dispose()}.
*
* If updateConfiguration phase fails, a {@link Mono#error(Throwable)} will be returned;
*
* @return a {@link Mono} of {@link DisposableServer}
*/
public final
Mono<? extends
DisposableServer>
bind() {
ServerBootstrap b;
try{
b =
configure();
}
catch (
Throwable t){
Exceptions.
throwIfFatal(
t);
return
Mono.
error(
t);
}
return
bind(
b);
}
/**
* Bind the {@link TcpServer} and return a {@link Mono} of {@link DisposableServer}
*
* @param b the {@link ServerBootstrap} to bind
*
* @return a {@link Mono} of {@link DisposableServer}
*/
public abstract
Mono<? extends
DisposableServer>
bind(
ServerBootstrap b);
/**
* Start a Server in a blocking fashion, and wait for it to finish initializing. The
* returned {@link DisposableServer} offers simple server API, including to {@link
* DisposableServer#disposeNow()} shut it down in a blocking fashion. The max startup
* timeout is 45 seconds.
*
* @return a {@link DisposableServer}
*/
public final
DisposableServer bindNow() {
return
bindNow(
Duration.
ofSeconds(45));
}
/**
* Start a Server in a blocking fashion, and wait for it to finish initializing. The
* returned {@link DisposableServer} offers simple server API, including to {@link
* DisposableServer#disposeNow()} shut it down in a blocking fashion.
*
* @param timeout max startup timeout
*
* @return a {@link DisposableServer}
*/
public final
DisposableServer bindNow(
Duration timeout) {
Objects.
requireNonNull(
timeout, "timeout");
try {
return
Objects.
requireNonNull(
bind().
block(
timeout), "aborted");
}
catch (
IllegalStateException e) {
if (
e.
getMessage().
contains("blocking read")) {
throw new
IllegalStateException("HttpServer couldn't be started within "
+
timeout.
toMillis() + "ms");
}
throw
e;
}
}
/**
* Start a Server in a fully blocking fashion, not only waiting for it to initialize
* but also blocking during the full lifecycle of the client/server. Since most
* servers will be long-lived, this is more adapted to running a server out of a main
* method, only allowing shutdown of the servers through sigkill.
* <p>
* Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added by
* this method in order to properly disconnect the client/server upon receiving a
* sigkill signal.
*
* @param timeout a timeout for server shutdown
* @param onStart an optional callback on server start
*/
public final void
bindUntilJavaShutdown(
Duration timeout, @
Nullable Consumer<
DisposableServer>
onStart) {
Objects.
requireNonNull(
timeout, "timeout");
DisposableServer facade =
bindNow();
Objects.
requireNonNull(
facade, "facade");
if (
onStart != null) {
onStart.
accept(
facade);
}
Runtime.
getRuntime()
.
addShutdownHook(new
Thread(() ->
facade.
disposeNow(
timeout)));
facade.
onDispose()
.
block();
}
/**
* Materialize a ServerBootstrap from the parent {@link TcpServer} chain to use with {@link
* #bind(ServerBootstrap)} or separately
*
* @return a configured {@link ServerBootstrap}
*/
public abstract
ServerBootstrap configure();
/**
* Setup a callback called when {@link io.netty.channel.ServerChannel} is about to
* bind.
*
* @param doOnBind a consumer observing server start event
*
* @return a new {@link TcpServer}
*/
public final
TcpServer doOnBind(
Consumer<? super
ServerBootstrap>
doOnBind) {
Objects.
requireNonNull(
doOnBind, "doOnBind");
return new
TcpServerDoOn(this,
doOnBind, null, null);
}
/**
* Setup a callback called when {@link io.netty.channel.ServerChannel} is
* bound.
*
* @param doOnBound a consumer observing server started event
*
* @return a new {@link TcpServer}
*/
public final
TcpServer doOnBound(
Consumer<? super
DisposableServer>
doOnBound) {
Objects.
requireNonNull(
doOnBound, "doOnBound");
return new
TcpServerDoOn(this, null,
doOnBound, null);
}
/**
* Setup a callback called when a remote client is connected
*
* @param doOnConnection a consumer observing connected clients
*
* @return a new {@link TcpServer}
*/
public final
TcpServer doOnConnection(
Consumer<? super
Connection>
doOnConnection) {
return new
TcpServerDoOnConnection(this,
doOnConnection);
}
/**
* Setup a callback called when {@link io.netty.channel.ServerChannel} is
* unbound.
*
* @param doOnUnbind a consumer observing server stop event
*
* @return a new {@link TcpServer}
*/
public final
TcpServer doOnUnbound(
Consumer<? super
DisposableServer>
doOnUnbind) {
Objects.
requireNonNull(
doOnUnbind, "doOnUnbound");
return new
TcpServerDoOn(this, null, null,
doOnUnbind);
}
/**
* Setup all lifecycle callbacks called on or after {@link io.netty.channel.Channel}
* has been bound and after it has been unbound.
*
* @param onBind a consumer observing server start event
* @param onBound a consumer observing server started event
* @param onUnbound a consumer observing server stop event
*
* @return a new {@link TcpServer}
*/
public final
TcpServer doOnLifecycle(
Consumer<? super
ServerBootstrap>
onBind,
Consumer<? super
DisposableServer>
onBound,
Consumer<? super
DisposableServer>
onUnbound) {
Objects.
requireNonNull(
onBind, "onBind");
Objects.
requireNonNull(
onBound, "onBound");
Objects.
requireNonNull(
onUnbound, "onUnbound");
return new
TcpServerDoOn(this,
onBind,
onBound,
onUnbound);
}
/**
* Attach an IO handler to react on connected client
*
* @param handler an IO handler that can dispose underlying connection when {@link
* Publisher} terminates.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer handle(
BiFunction<? super
NettyInbound, ? super
NettyOutbound, ? extends
Publisher<
Void>>
handler) {
return new
TcpServerHandle(this,
handler);
}
/**
* The host to which this server should bind.
*
* @param host The host to bind to.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer host(
String host) {
Objects.
requireNonNull(
host, "host");
return
bootstrap(
b ->
TcpUtils.
updateHost(
b,
host));
}
/**
* Return true if that {@link TcpServer} secured via SSL transport
*
* @return true if that {@link TcpServer} secured via SSL transport
*/
public final boolean
isSecure(){
return
sslProvider() != null;
}
/**
* Remove any previously applied SSL configuration customization
*
* @return a new {@link TcpServer}
*/
public final
TcpServer noSSL() {
return new
TcpServerUnsecure(this);
}
/**
* Setup all lifecycle callbacks called on or after {@link io.netty.channel.Channel}
* has been connected and after it has been disconnected.
*
* @param observer a consumer observing state changes
*
* @return a new {@link TcpServer}
*/
public final
TcpServer observe(
ConnectionObserver observer) {
return new
TcpServerObserve(this,
observer);
}
/**
* Set a {@link ChannelOption} value for low level connection settings like {@code SO_TIMEOUT}
* or {@code SO_KEEPALIVE}. This will apply to each new channel from remote peer.
* Use a value of {@code null} to remove a previous set {@link ChannelOption}.
*
* @param key the option key
* @param value the option value
* @param <T> the option type
*
* @return new {@link TcpServer}
*
* @see ServerBootstrap#childOption(ChannelOption, Object)
*/
public final <T>
TcpServer option(
ChannelOption<T>
key, @
Nullable T
value) {
Objects.
requireNonNull(
key, "key");
return
bootstrap(
b ->
b.
childOption(
key,
value));
}
/**
* The port to which this server should bind.
*
* @param port The port to bind to.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer port(int
port) {
return
bootstrap(
b ->
TcpUtils.
updatePort(
b,
port));
}
/**
* Run IO loops on the given {@link EventLoopGroup}.
*
* @param eventLoopGroup an eventLoopGroup to share
*
* @return a new {@link TcpServer}
*/
public final
TcpServer runOn(
EventLoopGroup eventLoopGroup) {
Objects.
requireNonNull(
eventLoopGroup, "eventLoopGroup");
return
runOn(
preferNative ->
eventLoopGroup);
}
/**
* Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources}
* container. Will prefer native (epoll/kqueue) implementation if available unless the
* environment property {@code reactor.netty.native} is set to {@code false}.
*
* @param channelResources a {@link LoopResources} accepting native runtime
* expectation and returning an eventLoopGroup
*
* @return a new {@link TcpServer}
*/
public final
TcpServer runOn(
LoopResources channelResources) {
return
runOn(
channelResources,
LoopResources.
DEFAULT_NATIVE);
}
/**
* Run IO loops on a supplied {@link EventLoopGroup} from the {@link LoopResources}
* container.
*
* @param channelResources a {@link LoopResources} accepting native runtime
* expectation and returning an eventLoopGroup.
* @param preferNative Should the connector prefer native (epoll/kqueue) if available.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer runOn(
LoopResources channelResources, boolean
preferNative) {
return new
TcpServerRunOn(this,
channelResources,
preferNative);
}
/**
* Apply an SSL configuration customization via the passed {@link SslContext}. with a
* default value of {@code 10} seconds handshake timeout unless the environment
* property {@code reactor.netty.tcp.sslHandshakeTimeout} is set.
*
* If {@link SelfSignedCertificate} needs to be used, the sample below can be
* used. Note that {@link SelfSignedCertificate} should not be used in production.
* <pre>
* {@code
* SelfSignedCertificate cert = new SelfSignedCertificate();
* SslContextBuilder sslContextBuilder =
* SslContextBuilder.forServer(cert.certificate(), cert.privateKey());
* secure(sslContextBuilder.build());
* }
*
* @param sslContext The context to set when configuring SSL
*
* @return a new {@link TcpServer}
*/
public final
TcpServer secure(
SslContext sslContext) {
return
secure(
sslProviderBuilder ->
sslProviderBuilder.
sslContext(
sslContext));
}
/**
* Apply an SSL configuration customization via the passed builder. The builder
* will produce the {@link SslContext} to be passed to with a default value of
* {@code 10} seconds handshake timeout unless the environment property {@code
* reactor.netty.tcp.sslHandshakeTimeout} is set.
*
* If {@link SelfSignedCertificate} needs to be used, the sample below can be
* used. Note that {@link SelfSignedCertificate} should not be used in production.
* <pre>
* {@code
* SelfSignedCertificate cert = new SelfSignedCertificate();
* SslContextBuilder sslContextBuilder =
* SslContextBuilder.forServer(cert.certificate(), cert.privateKey());
* secure(sslContextSpec -> sslContextSpec.sslContext(sslContextBuilder));
* }
*
* @param sslProviderBuilder builder callback for further customization of SslContext.
*
* @return a new {@link TcpServer}
*/
public final
TcpServer secure(
Consumer<? super
SslProvider.
SslContextSpec>
sslProviderBuilder) {
return new
TcpServerSecure(this,
sslProviderBuilder);
}
/**
* Inject default attribute to the future {@link io.netty.channel.ServerChannel}
* selector connection.
*
* @param key the attribute key
* @param value the attribute value
* @param <T> the attribute type
*
* @return a new {@link TcpServer}
*
* @see ServerBootstrap#attr(AttributeKey, Object)
*/
public final <T>
TcpServer selectorAttr(
AttributeKey<T>
key, T
value) {
Objects.
requireNonNull(
key, "key");
return
bootstrap(
b ->
b.
attr(
key,
value));
}
/**
* Set a {@link ChannelOption} value for low level connection settings like {@code SO_TIMEOUT}
* or {@code SO_KEEPALIVE}. This will apply to parent selector channel.
*
* @param key the option key
* @param value the option value
* @param <T> the option type
*
* @return new {@link TcpServer}
*
* @see ServerBootstrap#childOption(ChannelOption, Object)
*/
public final <T>
TcpServer selectorOption(
ChannelOption<T>
key, T
value) {
return
option(
key,
value);
}
/**
* Return the current {@link SslProvider} if that {@link TcpServer} secured via SSL
* transport or null
*
* @return the current {@link SslProvider} if that {@link TcpServer} secured via SSL
* transport or null
*/
@
Nullable
public
SslProvider sslProvider() {
return null;
}
/**
* Apply a wire logger configuration using {@link TcpServer} category
* and {@code DEBUG} logger level
*
* @return a new {@link TcpServer}
*/
public final
TcpServer wiretap() {
return
bootstrap(
b ->
BootstrapHandlers.
updateLogSupport(
b,
LOGGING_HANDLER));
}
/**
* Apply a wire logger configuration using the specified category
* and {@code DEBUG} logger level
*
* @param category the logger category
*
* @return a new {@link TcpServer}
*/
public final
TcpServer wiretap(
String category) {
return
wiretap(
category,
LogLevel.
DEBUG);
}
/**
* Apply a wire logger configuration using the specified category
* and logger level
*
* @param category the logger category
* @param level the logger level
*
* @return a new {@link TcpServer}
*/
public final
TcpServer wiretap(
String category,
LogLevel level) {
Objects.
requireNonNull(
category, "category");
Objects.
requireNonNull(
level, "level");
return
bootstrap(
b ->
BootstrapHandlers.
updateLogSupport(
b,
new
LoggingHandler(
category,
level)));
}
static final int
DEFAULT_PORT = 0;
static final
LoggingHandler LOGGING_HANDLER = new
LoggingHandler(
TcpServer.class);
static final
Logger log =
Loggers.
getLogger(
TcpServer.class);
}