/*******************************************************************************
* Copyright (c) 2017-2021, org.smartboot. All rights reserved.
* project name: smart-socket
* file name: EnhanceAsynchronousSocketChannel.java
* Date: 2021-07-29
* Author: sandao (zhengjunweimail@163.com)
*
******************************************************************************/
package org.smartboot.socket.enhance;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.ReadPendingException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ShutdownChannelGroupException;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritePendingException;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 模拟JDK7的AIO处理方式
*
* @author 三刀
* @version V1.0 , 2018/5/24
*/
final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
/**
* 实际的Socket通道
*/
private final SocketChannel channel;
/**
* 处理当前连接IO事件的资源组
*/
private final EnhanceAsynchronousChannelGroup group;
/**
* 处理 read 事件的线程资源
*/
private final EnhanceAsynchronousChannelGroup.Worker readWorker;
/**
* 处理 write 事件的线程资源
*/
private final EnhanceAsynchronousChannelGroup.Worker writeWorker;
/**
* 处理 connect 事件的线程资源
*/
private final EnhanceAsynchronousChannelGroup.Worker connectWorker;
/**
* 用于接收 read 通道数据的缓冲区,经解码后腾出缓冲区以供下一批数据的读取
*/
private ByteBuffer readBuffer;
/**
* 用于接收 read 通道数据的缓冲区集合
*/
private ByteBufferArray scatteringReadBuffer;
/**
* 存放待输出数据的缓冲区
*/
private ByteBuffer writeBuffer;
/**
* 存放待输出数据的缓冲区集合
*/
private ByteBufferArray gatheringWriteBuffer;
/**
* read 回调事件处理器
*/
private CompletionHandler<Number, Object> readCompletionHandler;
/**
* write 回调事件处理器
*/
private CompletionHandler<Number, Object> writeCompletionHandler;
/**
* connect 回调事件处理器
*/
private CompletionHandler<Void, Object> connectCompletionHandler;
private FutureCompletionHandler<Void, Void> connectFuture;
private FutureCompletionHandler<? extends Number, Object> readFuture;
private FutureCompletionHandler<? extends Number, Object> writeFuture;
/**
* read 回调事件关联绑定的附件对象
*/
private Object readAttachment;
/**
* write 回调事件关联绑定的附件对象
*/
private Object writeAttachment;
/**
* connect 回调事件关联绑定的附件对象
*/
private Object connectAttachment;
private SelectionKey readSelectionKey;
private SelectionKey readFutureSelectionKey;
private SelectionKey writeSelectionKey;
private SelectionKey writeFutureSelectionKey;
private SelectionKey connectSelectionKey;
/**
* 当前是否正在执行 write 操作
*/
private boolean writePending;
/**
* 当前是否正在执行 read 操作
*/
private boolean readPending;
/**
* 当前是否正在执行 connect 操作
*/
private boolean connectionPending;
/**
* 远程连接的地址
*/
private SocketAddress remote;
private int writeInvoker;
public EnhanceAsynchronousSocketChannel(EnhanceAsynchronousChannelGroup group, SocketChannel channel) throws IOException {
super(group.provider());
this.group = group;
this.channel = channel;
readWorker = group.getReadWorker();
writeWorker = group.getWriteWorker();
connectWorker = group.getConnectWorker();
}
@Override
public void close() throws IOException {
IOException exception = null;
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
exception = e;
}
if (readSelectionKey != null) {
readSelectionKey.cancel();
readSelectionKey = null;
}
if (readFutureSelectionKey != null) {
readFutureSelectionKey.cancel();
readFutureSelectionKey = null;
}
if (writeSelectionKey != null) {
writeSelectionKey.cancel();
writeSelectionKey = null;
}
if (writeFutureSelectionKey != null) {
writeFutureSelectionKey.cancel();
writeFutureSelectionKey = null;
}
if (connectSelectionKey != null) {
connectSelectionKey.cancel();
connectSelectionKey = null;
}
if (exception != null) {
throw exception;
}
}
@Override
public AsynchronousSocketChannel bind(SocketAddress local) throws IOException {
channel.bind(local);
return this;
}
@Override
public <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value) throws IOException {
channel.setOption(name, value);
return this;
}
@Override
public <T> T getOption(SocketOption<T> name) throws IOException {
return channel.getOption(name);
}
@Override
public Set<SocketOption<?>> supportedOptions() {
return channel.supportedOptions();
}
@Override
public AsynchronousSocketChannel shutdownInput() throws IOException {
channel.shutdownInput();
return this;
}
@Override
public AsynchronousSocketChannel shutdownOutput() throws IOException {
channel.shutdownOutput();
return this;
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
return channel.getRemoteAddress();
}
@Override
public <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void, ? super A> handler) {
if (group.isTerminated()) {
throw new ShutdownChannelGroupException();
}
if (channel.isConnected()) {
throw new AlreadyConnectedException();
}
if (connectionPending) {
throw new ConnectionPendingException();
}
connectionPending = true;
this.connectAttachment = attachment;
this.connectCompletionHandler = (CompletionHandler<Void, Object>) handler;
this.remote = remote;
doConnect();
}
@Override
public Future<Void> connect(SocketAddress remote) {
FutureCompletionHandler<Void, Void> connectFuture = new FutureCompletionHandler<>();
connect(remote, null, connectFuture);
this.connectFuture = connectFuture;
return connectFuture;
}
@Override
public <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer, ? super A> handler) {
read0(dst, null, timeout, unit, attachment, handler);
}
private <V extends Number, A> void read0(ByteBuffer readBuffer, ByteBufferArray scattering, long timeout, TimeUnit unit, A attachment, CompletionHandler<V, ? super A> handler) {
if (readPending) {
throw new ReadPendingException();
}
readPending = true;
this.readBuffer = readBuffer;
this.scatteringReadBuffer = scattering;
this.readAttachment = attachment;
if (timeout > 0) {
readFuture = new FutureCompletionHandler<>((CompletionHandler<Number, Object>) handl