package net.qiujuer.library.clink.impl;
import net.qiujuer.library.clink.core.IoProvider;
import net.qiujuer.library.clink.utils.CloseUtils;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class IoSelectorProvider implements IoProvider {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
// 是否处于某个过程
private final AtomicBoolean inRegInput = new AtomicBoolean(false);
private final AtomicBoolean inRegOutput = new AtomicBoolean(false);
private final Selector readSelector;
private final Selector writeSelector;
private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();
private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();
private final ExecutorService inputHandlePool;
private final ExecutorService outputHandlePool;
public IoSelectorProvider() throws IOException {
readSelector = Selector.open();
writeSelector = Selector.open();
inputHandlePool = Executors.newFixedThreadPool(4,
new IoProviderThreadFactory("IoProvider-Input-Thread-"));
outputHandlePool = Executors.newFixedThreadPool(4,
new IoProviderThreadFactory("IoProvider-Output-Thread-"));
// 开始输出输入的监听
startRead();
startWrite();
}
private void startRead() {
Thread thread = new Thread("Clink IoSelectorProvider ReadSelector Thread") {
@Override
public void run() {
while (!isClosed.get()) {
try {
if (readSelector.select() == 0) {
waitSelection(inRegInput);
continue;
}
Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isValid()) {
handleSelection(selectionKey, SelectionKey.OP_READ, inputCallbackMap, inputHandlePool);
}
}
System.out.println("有数据需要读取:"+selectionKeys.size());
selectionKeys.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
thread.setPriority(Thread.MAX_PRIORITY);
thread.start();
}
private void startWrite() {
Thread thread = new Thread("Clink IoSelectorProvider WriteSelector Thread") {
@Override
public void run() {
while (!isClosed.get()) {
try {
if (writeSelector.select() == 0) {
waitSelection(inRegOutput);
continue;
}
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isValid()) {
handleSelection(selectionKey, SelectionKey.OP_WRITE, outputCallbackMap, outputHandlePool);
}
}
selectionKeys.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
thread.setPriority(Thread.MAX_PRIORITY);
thread.start();
}
@Override
public boolean registerInput(SocketChannel channel, HandleInputCallback callback) {
return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,
inputCallbackMap, callback) != null;
}
@Override
public boolean registerOutput(SocketChannel channel, HandleOutputCallback callback) {
return registerSelection(channel, writeSelector, SelectionKey.OP_WRITE, inRegOutput,
outputCallbackMap, callback) != null;
}
@Override
public void unRegisterInput(SocketChannel channel) {
unRegisterSelection(channel, readSelector, inputCallbackMap);
}
@Override
public void unRegisterOutput(SocketChannel channel) {
unRegisterSelection(channel, writeSelector, outputCallbackMap);
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
inputHandlePool.shutdown();
outputHandlePool.shutdown();
inputCallbackMap.clear();
outputCallbackMap.clear();
readSelector.wakeup();
writeSelector.wakeup();
CloseUtils.close(readSelector, writeSelector);
}
}
private static void waitSelection(final AtomicBoolean locker) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (locker) {
if (locker.get()) {
try {
locker.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static SelectionKey registerSelection(SocketChannel channel, Selector selector,
int registerOps, AtomicBoolean locker,
HashMap<SelectionKey, Runnable> map,
Runnable runnable) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (locker) {
// 设置锁定状态
locker.set(true);
try {
// 唤醒当前的selector,让selector不处于select()状态
selector.wakeup();
SelectionKey key = null;
if (channel.isRegistered()) {
// 查询是否已经注册过
key = channel.keyFor(selector);
if (key != null) {
key.interestOps(key.readyOps() | registerOps);
}
}
if (key == null) {
// 注册selector得到Key
key = channel.register(selector, registerOps);
// 注册回调
map.put(key, runnable);
}
return key;
} catch (ClosedChannelException e) {
return null;
} finally {
// 解除锁定状态
locker.set(false);
try {
// 通知
locker.notify();
} catch (Exception ignored) {
}
}
}
}
private static void unRegisterSelection(SocketChannel channel, Selector selector,
Map<SelectionKey, Runnable> map) {
if (channel.isRegistered()) {
SelectionKey key = channel.keyFor(selector);
if (key != null) {
// 取消监听的方法
key.cancel();
map.remove(key);
selector.wakeup();
}
}
}
private static void handleSelection(SelectionKey key, int keyOps,
HashMap<SelectionKey, Runnable> map,
ExecutorService pool) {
// 重点
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
WebSocket Socket TCP/UDP (500个子文件)
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
gradlew.bat 2KB
.gitignore 143B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 17B
.gitignore 10B
.gitignore 7B
settings.gradle 4KB
build.gradle 941B
build.gradle 546B
build.gradle 434B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
build.gradle 222B
共 500 条
- 1
- 2
- 3
- 4
- 5
资源评论
MicrophoneBen
- 粉丝: 4
- 资源: 5
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功