/*******************************************************************************
* Copyright (c) 2017-2019, org.smartboot. All rights reserved.
* project name: smart-socket
* file name: TcpAioSession.java
* Date: 2019-12-31
* Author: sandao (zhengjunweimail@163.com)
*
******************************************************************************/
package org.smartboot.socket.transport;
import org.smartboot.socket.MessageProcessor;
import org.smartboot.socket.NetMonitor;
import org.smartboot.socket.StateMachineEnum;
import org.smartboot.socket.buffer.BufferPage;
import org.smartboot.socket.buffer.VirtualBuffer;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* AIO传输层会话。
*
* <p>
* AioSession为smart-socket最核心的类,封装{@link AsynchronousSocketChannel} API接口,简化IO操作。
* </p>
* <p>
* 其中开放给用户使用的接口为:
* <ol>
* <li>{@link TcpAioSession#close()}</li>
* <li>{@link TcpAioSession#close(boolean)}</li>
* <li>{@link TcpAioSession#getAttachment()} </li>
* <li>{@link TcpAioSession#getInputStream()} </li>
* <li>{@link TcpAioSession#getInputStream(int)} </li>
* <li>{@link TcpAioSession#getLocalAddress()} </li>
* <li>{@link TcpAioSession#getRemoteAddress()} </li>
* <li>{@link TcpAioSession#getSessionID()} </li>
* <li>{@link TcpAioSession#isInvalid()} </li>
* <li>{@link TcpAioSession#setAttachment(Object)} </li>
* </ol>
*
* </p>
*
* @author 三刀
* @version V1.0.0
*/
final class TcpAioSession extends AioSession {
/**
* 读事件回调处理
*/
private static final CompletionHandler<Integer, TcpAioSession> READ_COMPLETION_HANDLER = new CompletionHandler<Integer, TcpAioSession>() {
@Override
public void completed(Integer result, TcpAioSession aioSession) {
try {
aioSession.readCompleted(result);
} catch (Throwable throwable) {
failed(throwable, aioSession);
}
}
@Override
public void failed(Throwable exc, TcpAioSession aioSession) {
try {
aioSession.config.getProcessor().stateEvent(aioSession, StateMachineEnum.INPUT_EXCEPTION, exc);
} catch (Exception e) {
e.printStackTrace();
}
try {
aioSession.close(false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
/**
* 写事件回调处理
*/
private static final CompletionHandler<Integer, TcpAioSession> WRITE_COMPLETION_HANDLER = new CompletionHandler<Integer, TcpAioSession>() {
@Override
public void completed(Integer result, TcpAioSession aioSession) {
try {
aioSession.writeCompleted(result);
} catch (Throwable throwable) {
failed(throwable, aioSession);
}
}
@Override
public void failed(Throwable exc, TcpAioSession aioSession) {
try {
aioSession.config.getProcessor().stateEvent(aioSession, StateMachineEnum.OUTPUT_EXCEPTION, exc);
} catch (Exception e) {
e.printStackTrace();
}
try {
aioSession.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
/**
* 底层通信channel对象
*/
private final AsynchronousSocketChannel channel;
/**
* 输出流
*/
private final WriteBuffer byteBuf;
/**
* 输出信号量,防止并发write导致异常
*/
private final Semaphore semaphore = new Semaphore(1);
/**
* 读缓冲。
* <p>大小取决于AioQuickClient/AioQuickServer设置的setReadBufferSize</p>
*/
private VirtualBuffer readBuffer;
/**
* 写缓冲
*/
private VirtualBuffer writeBuffer;
/**
* 同步输入流
*/
private InputStream inputStream;
private final BufferPage bufferPage;
private final IoServerConfig config;
/**
* @param channel Socket通道
*/
TcpAioSession(AsynchronousSocketChannel channel, IoServerConfig config, BufferPage bufferPage, Function<BufferPage, VirtualBuffer> function) {
this.channel = channel;
this.config = config;
this.bufferPage = bufferPage;
this.function = function;
Consumer<WriteBuffer> flushConsumer = var -> {
if (!semaphore.tryAcquire()) {
return;
}
TcpAioSession.this.writeBuffer = var.poll();
if (writeBuffer == null) {
semaphore.release();
} else {
continueWrite(writeBuffer);
}
};
byteBuf = new WriteBuffer(bufferPage, flushConsumer, config.getWriteBufferSize(), config.getWriteBufferCapacity());
//触发状态机
config.getProcessor().stateEvent(this, StateMachineEnum.NEW_SESSION, null);
doRead();
}
private final Function<BufferPage, VirtualBuffer> function;
void doRead() {
this.readBuffer = function.apply(bufferPage);
this.readBuffer.buffer().flip();
signalRead();
}
/**
* 触发AIO的写操作,
* <p>需要调用控制同步</p>
*/
void writeCompleted(int result) {
NetMonitor monitor = config.getMonitor();
if (monitor != null) {
monitor.afterWrite(this, result);
}
if (writeBuffer == null) {
writeBuffer = byteBuf.pollItem();
} else if (!writeBuffer.buffer().hasRemaining()) {
writeBuffer.clean();
writeBuffer = byteBuf.pollItem();
}
if (writeBuffer != null) {
continueWrite(writeBuffer);
return;
}
semaphore.release();
//此时可能是Closing或Closed状态
if (status != SESSION_STATUS_ENABLED) {
close();
} else {
//也许此时有新的消息通过write方法添加到writeCacheQueue中
byteBuf.flush();
}
}
/**
* @return 输入流
*/
public WriteBuffer writeBuffer() {
return byteBuf;
}
@Override
public ByteBuffer readBuffer() {
return readBuffer.buffer();
}
@Override
public void awaitRead() {
modCount++;
}
/**
* 是否立即关闭会话
*
* @param immediate true:立即关闭,false:响应消息发送完后关闭
*/
public synchronized void close(boolean immediate) {
//status == SESSION_STATUS_CLOSED说明close方法被重复调用
if (status == SESSION_STATUS_CLOSED) {
// System.out.println("ignore, session:" + getSessionID() + " is closed:");
return;
}
status = immediate ? SESSION_STATUS_CLOSED : SESSION_STATUS_CLOSING;
if (immediate) {
try {
byteBuf.close();
if (readBuffer != null) {
readBuffer.clean();
readBuffer = null;
}
if (writeBuffer != null) {
writeBuffer.clean();
writeBuffer = null;
}
} finally {
IOUtil.close(channel);
config.getProcessor().stateEvent(this, StateMachineEnum.SESSION_CLOSED, null);
}
} else if ((writeBuffer == null || !writeBuffer.buffer().hasRemaining()) && byteBuf.isEmpty()) {
close(true);
} else {
config.getProc
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
极简、易用、高性能的AIO通信框架 (140个子文件)
Dockerfile 353B
TcpAioSession.java 16KB
EnhanceAsynchronousSocketChannel.java 16KB
AioQuickClient.java 13KB
AioQuickServer.java 12KB
SslAsynchronousSocketChannel.java 12KB
WriteBuffer.java 9KB
SslService.java 9KB
BufferPage.java 9KB
EnhanceAsynchronousChannelGroup.java 8KB
Worker.java 7KB
RateLimiterPlugin.java 7KB
StreamMonitorPlugin.java 7KB
UdpChannel.java 6KB
EnhanceAsynchronousServerSocketChannel.java 6KB
IoServerConfig.java 6KB
HeartPlugin.java 6KB
RpcConsumerProcessor.java 6KB
MonitorPlugin.java 5KB
RpcProviderProcessor.java 5KB
UdpBootstrap.java 5KB
AioSession.java 4KB
AsynchronousSocketChannelProxy.java 4KB
SslStringClient.java 4KB
DelimiterFrameDecoder.java 4KB
StringUtils.java 4KB
UdpClient.java 4KB
UnsupportedAsynchronousSocketChannel.java 4KB
HeartPluginDemo.java 4KB
AbstractMessageProcessor.java 3KB
BufferPagePool.java 3KB
UdpClientDemo.java 3KB
StreamMonitorDemo.java 3KB
ReconnectClient.java 3KB
StringMutilClient.java 3KB
StringProtocol.java 3KB
SslStringServer.java 3KB
SslPlugin.java 3KB
VirtualBuffer.java 3KB
BufferPageMonitorPlugin.java 3KB
SocketOptionPlugin.java 3KB
EnhanceAsynchronousChannelProvider.java 3KB
RateLimiterDemo.java 3KB
StringClient.java 3KB
Clients.java 3KB
HandshakeModel.java 3KB
StateMachineEnum.java 3KB
UdpServer.java 3KB
UdpServerDemo.java 2KB
UdpAioSession.java 2KB
NetMonitor.java 2KB
PushServerProcessorMessage.java 2KB
IPBlackListPluginDemo.java 2KB
Server.java 2KB
GroupMessageProcessor.java 2KB
NettyServer.java 2KB
FutureCompletionHandler.java 2KB
LocalAddressClient.java 2KB
BlackListPlugin.java 2KB
StringDemo.java 2KB
SslDemo.java 2KB
StringServer.java 2KB
QuickTimerTask.java 2KB
ClosedClient.java 2KB
DelimiterProtocol.java 2KB
IntegerDemo.java 2KB
SmartClient.java 2KB
ClientSSLContextFactory.java 2KB
AttachKey.java 2KB
AsyncClientDemo.java 2KB
DelimiterFrameDecoderTest.java 2KB
ReconnectPlugin.java 2KB
FixedLengthProtocol.java 2KB
RpcRequest.java 2KB
Consumer.java 2KB
FixedLengthFrameDecoder.java 2KB
RpcResponse.java 2KB
RateLimiterClient.java 2KB
SenderClient.java 2KB
SpringDemo.java 1KB
BufferUtils.java 1KB
Attachment.java 1KB
MessageProcessor.java 1KB
ReceiverClient.java 1KB
Protocol.java 1KB
SmartServer.java 1KB
MessageProcessorImpl.java 1KB
AbstractPlugin.java 1KB
Plugin.java 1KB
ServerSSLContextFactory.java 1KB
FixedLengthBytesProtocol.java 1KB
BigObjectProtocol.java 1KB
IOUtil.java 1KB
IntegerServerProcessor.java 1KB
RpcProtocol.java 1KB
IntegerClientProcessor.java 1KB
Provider.java 1KB
NettyTimeServerHandler.java 1KB
GroupIo.java 1KB
package-info.java 1KB
共 140 条
- 1
- 2
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6650
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功