/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.buffer.
ByteBuf;
import io.netty.buffer.
ByteBufAllocator;
import io.netty.channel.
Channel;
import io.netty.channel.
ChannelConfig;
import io.netty.channel.
ChannelFuture;
import io.netty.channel.
ChannelMetadata;
import io.netty.channel.
ChannelOption;
import io.netty.channel.
ChannelOutboundBuffer;
import io.netty.channel.
ChannelPipeline;
import io.netty.channel.
FileRegion;
import io.netty.channel.
RecvByteBufAllocator;
import io.netty.channel.socket.
ChannelInputShutdownEvent;
import io.netty.channel.socket.
ChannelInputShutdownReadComplete;
import io.netty.util.internal.
StringUtil;
import java.io.
IOException;
/**
* Abstract base class for OIO which reads and writes bytes from/to a Socket
*/
public abstract class
AbstractOioByteChannel extends
AbstractOioChannel {
private static final
ChannelMetadata METADATA = new
ChannelMetadata(false);
private static final
String EXPECTED_TYPES =
" (expected: " +
StringUtil.
simpleClassName(
ByteBuf.class) + ", " +
StringUtil.
simpleClassName(
FileRegion.class) + ')';
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
*/
protected
AbstractOioByteChannel(
Channel parent) {
super(
parent);
}
@
Override
public
ChannelMetadata metadata() {
return
METADATA;
}
/**
* Determine if the input side of this channel is shutdown.
* @return {@code true} if the input side of this channel is shutdown.
*/
protected abstract boolean
isInputShutdown();
/**
* Shutdown the input side of this channel.
* @return A channel future that will complete when the shutdown is complete.
*/
protected abstract
ChannelFuture shutdownInput();
private void
closeOnRead(
ChannelPipeline pipeline) {
if (
isOpen()) {
if (
Boolean.
TRUE.
equals(
config().
getOption(
ChannelOption.
ALLOW_HALF_CLOSURE))) {
shutdownInput();
pipeline.
fireUserEventTriggered(
ChannelInputShutdownEvent.
INSTANCE);
} else {
unsafe().
close(
unsafe().
voidPromise());
}
pipeline.
fireUserEventTriggered(
ChannelInputShutdownReadComplete.
INSTANCE);
}
}
private void
handleReadException(
ChannelPipeline pipeline,
ByteBuf byteBuf,
Throwable cause, boolean
close,
RecvByteBufAllocator.
Handle allocHandle) {
if (
byteBuf != null) {
if (
byteBuf.
isReadable()) {
readPending = false;
pipeline.
fireChannelRead(
byteBuf);
} else {
byteBuf.
release();
}
}
allocHandle.
readComplete();
pipeline.
fireChannelReadComplete();
pipeline.
fireExceptionCaught(
cause);
if (
close ||
cause instanceof
IOException) {
closeOnRead(
pipeline);
}
}
@
Override
protected void
doRead() {
final
ChannelConfig config =
config();
if (
isInputShutdown() || !
readPending) {
// We have to check readPending here because the Runnable to read could have been scheduled and later
// during the same read loop readPending was set to false.
return;
}
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;
final
ChannelPipeline pipeline =
pipeline();
final
ByteBufAllocator allocator =
config.
getAllocator();
final
RecvByteBufAllocator.
Handle allocHandle =
unsafe().
recvBufAllocHandle();
allocHandle.
reset(
config);
ByteBuf byteBuf = null;
boolean
close = false;
boolean
readData = false;
try {
byteBuf =
allocHandle.
allocate(
allocator);
do {
allocHandle.
lastBytesRead(
doReadBytes(
byteBuf));
if (
allocHandle.
lastBytesRead() <= 0) {
if (!
byteBuf.
isReadable()) { // nothing was read. release the buffer.
byteBuf.
release();
byteBuf = null;
close =
allocHandle.
lastBytesRead() < 0;
if (
close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
}
break;
} else {
readData = true;
}
final int
available =
available();
if (
available <= 0) {
break;
}
// Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
if (!
byteBuf.
isWritable()) {
final int
capacity =
byteBuf.
capacity();
final int
maxCapacity =
byteBuf.
maxCapacity();
if (
capacity ==
maxCapacity) {
allocHandle.
incMessagesRead(1);
readPending = false;
pipeline.
fireChannelRead(
byteBuf);
byteBuf =
allocHandle.
allocate(
allocator);
} else {
final int
writerIndex =
byteBuf.
writerIndex();
if (
writerIndex +
available >
maxCapacity) {
byteBuf.
capacity(
maxCapacity);
} else {
byteBuf.
ensureWritable(
available);
}
}
}
} while (
allocHandle.
continueReading());
if (
byteBuf != null) {
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use
// it because allocHandle.continueReading() returned false.
if (
byteBuf.
isReadable()) {
readPending = false;
pipeline.
fireChannelRead(
byteBuf);
} else {
byteBuf.
release();
}
byteBuf = null;
}
if (
readData) {
allocHandle.
readComplete();
pipeline.
fireChannelReadComplete();
}
if (
close) {
closeOnRead(
pipeline);
}
} catch (
Throwable t) {
handleReadException(
pipeline,
byteBuf,
t,
close,
allocHandle);
} finally {
if (
readPending ||
config.
isAutoRead() || !
readData &&
isActive()) {
// Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
// should execute read() again because no data may have been read.
read();
}
}
}
@
Override
protected void
doWrite(
ChannelOutboundBuffer in) throws
Exception {
for (;;) {
Object msg =
in.
current();
if (
msg == null) {
// nothing left to write
break;
}
if (
msg instanceof
ByteBuf) {
ByteBuf buf = (
ByteBuf)
msg;
int
readableBytes =
buf.
readableBytes();
while (
readableBytes > 0) {
doWriteBytes(
buf);
int
newReadableBytes =
buf.
readableBytes();
in.
progress(
readableBytes -
newReadableBytes);
readableBytes =
newReadableBytes;
}
in.
remove();
} else if (
msg instanceof
FileRegion) {
FileRegion region = (
FileRegion)
msg;
long
transferred =
region.
transferred();
doWriteFileRegion(
region);
in.
progress(
region.
transferred() -
transferred);
in.
remove();
} else {
in.
remove(new
UnsupportedOperationException(
"unsupported message type: " +
StringUtil.
simpleClassName(
msg)));
}
}
}
@
Override
protected final
Object filterOutboundMessage(
Object msg) throws
Exception {
if (
msg instanceof
ByteBuf ||
msg instanceof
FileRegion) {
return
msg;
}
throw new
UnsupportedOperationException(
"unsupported message type: " +
StringUtil.
simpleClassName(
msg) +
EXPECTED_TYPES);
}
/**
* Return the number of bytes ready to read from the underlying Socket.
*/
protected abstract int
available();
/**
* Read bytes from the underlying Socket.
*
* @param buf the {@link ByteBuf} into which the read bytes will be written
* @return amount the number of bytes read. This may return a negative amount if the underlying
* Socket was closed
* @throws Exception is thrown if an error occurred
*/
protected abstract int
doReadBytes(
ByteBuf buf) throws
Exception;
/**
* Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
*
* @param buf the {@link ByteBuf} which holds the data to transfer
* @throws Exception is thrown if an error occurred
*/
protected abstract void
doWriteBytes(
ByteBuf buf) throws
Exception;
/**
* Write the data which is hold by the {@link FileRegion} to the underlying Socket.
*
* @param region the {@link FileRegion} which holds the data to transfer
* @throws Exception is thrown if an error occurred
*/
protected abstract void
doWriteFileRegion(
FileRegion region) throws
Exception;
}