package com.telesound.comunict.dao.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import com.telesound.comunict.dao.ComManagerServerManager;
import com.telesound.comunict.vo.CommData;
public class ComManagerServerManagerImpl implements ComManagerServerManager {
private SocketChannel socketChannel;// 客户端通道
private Selector selector = null;// 选择器
private ManagerReadThread mrt; // 读取线程
private int countRead;// 调用readCushFile方法的线程数必须得同步
private List<byte[]> events = null;// 获取消息队列容器
//private List<CommData> alarms = new ArrayList<CommData>();// 特殊消息队列1
//private List<CommData> faults = new ArrayList<CommData>();// 特殊消息队列2
private int outTimes;
//private DataAnalysis dataAnalys ; //解析类,请自行创建
private InetSocketAddress inetSocketAddress ;
public List<byte[]> getEvents() {
return events;
}
public void setEvents(List<byte[]> events) {
this.events = events;
}
/**
* 创建连接接口
*
* @param serverIp
* @param port
* @throws IOException
*/
public void creatManageNet(String serverIp, int port) throws IOException {
this.inetSocketAddress = new InetSocketAddress(serverIp,
port);
initialize(inetSocketAddress);
}
// 建立客户端连接
private void initialize(InetSocketAddress inetSocketAddress)
throws IOException {
// 打开监听通道
socketChannel = SocketChannel.open(inetSocketAddress);
// 设置非阻塞模式
socketChannel.configureBlocking(false);
// this.list = new ArrayList<byte[]>();
// 打开选择器并注册到信道
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ
| SelectionKey.OP_WRITE);// 读取集
// 启动读取的线程----
mrt = new ManagerReadThread(selector);
mrt.start();
}
/**
* 发送数据到服务器
*
* @param byout
* @throws IOException
*/
public void sendData(byte[] byout) throws IOException {
ByteBuffer writeBuf = ByteBuffer.wrap(byout);
socketChannel.write(writeBuf);
socketChannel.register(selector, SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
/**
* 读取缓冲区数据/消息队列(list)
*
* @param order
* 指令
* @return CommData 封装了的通讯数据
*/
public synchronized List<CommData> readCush(String order) {
countRead++;
if (countRead > 1) { // 当有线程调用读取的时候进来的线程等待
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
boolean b = true;
// 开始读取数据
List<CommData> cdatas = new ArrayList<CommData>();
while (b) {
events = new ArrayList<byte[]>();
this.setEvents(mrt.getBlist());
mrt.getBlist().removeAll(this.events);// 将消息队列传出,并清理,使得读线程能继续往消息队列中存放数据
// 获取有效数据
if (this.getEvents() != null && this.getEvents().size() > 0) {
for (int i = 0; i < this.getEvents().size(); i++) {
CommData cdata = processData(this.getEvents().get(i), order);
//处理并封装接收数据
if (cdata.getOrder().equals(order)) {
cdatas.add(cdata);
}
this.getEvents().remove(i);
}
}
if (this.getEvents().size() > 0 || cdatas.size() > 0) {
b = false;
}
}
// 释放阻塞的线程
countRead--;
this.notify();
return cdatas;
}
// 解析封装数据
private CommData processData(byte[] bytes, String order) {
CommData cdata = null;
// 先循环解析前28个字节,若不匹配,依次递增,如果指令匹配,则封装数据并退出循环
boolean isOrder = false;
int sectionNum = 1;// 截取段落计数如第一段12-28指令不匹配则截取第二段12*2-28*2,只截取有效数据
while (!isOrder) {
if (28 * sectionNum <= bytes.length) {
// 当前字节中指令
String currOrder = HexConversion
.bytesToHexString(DataAnalysis.copyByteArray(bytes,
12 * sectionNum, 16 * sectionNum));
if (currOrder.compareTo(order) == 0) {
cdata = new CommData();
// 数据长度4byte
int dataLength = HexConversion.byte2Int(DataAnalysis
.copyByteArray(bytes, 16 * sectionNum,
20 * sectionNum));
// 会话ID 4byte
int convId = HexConversion.byte2Int(DataAnalysis
.copyByteArray(bytes, 20 * sectionNum,
24 * sectionNum));
// 流水号 4byte;
int serNum = HexConversion.byte2Int(DataAnalysis
.copyByteArray(bytes, 24 * sectionNum,
28 * sectionNum));
byte[] data = DataAnalysis.copyByteArray(bytes,
28 * sectionNum, 28 * sectionNum + dataLength);
cdata.setOrder(currOrder);
cdata.setConvId(convId);
cdata.setDataLength(dataLength);
cdata.setSerNum(serNum);
cdata.setData(data);
}
sectionNum++;
} else {
return cdata;
}
}
return cdata;
}
/**
* 登录服务器
*
* @param serverIp
* @param port
* @param username
* @param password
* @param key
* @param sid
* @return 0成功,-1失败
* @throws IOException
*/
public int loginManager(String serverIp, int port, String username,
String password, String key, String sid) throws IOException {
Map data = new HashMap();
data.put("serverIp", serverIp);
data.put("port", port);
data.put("username", username);
data.put("password", password);
data.put("key", key);
data.put("sid", sid);
byte[] byout = this.dataAnalys.outLoginManager(data);
this.sendData(byout);
List<CommData> cdatas = readCush(ComunictConst.LOGIN_MANAGER_ORDER_RECIVE);
data = this.dataAnalys.inLoginManager(cdatas.get(0).getData());
return Integer.parseInt(data.get("status").toString());
}
/**
* 登出
*/
public void logoutManager() {
byte[] byout = dataAnalys.outLoginoutManager(null);
try {
this.sendData(byout);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 握手
*/
public void comhandNet() {
Timer timer = new Timer();
// handC = new ChannelReadClient(selector);
timer.schedule(new TimerTask() {
public void run() {
// new Thread(new ReadThread(selector)).start();
byte[] handSend = new byte[28];
dataAnalys.packageHead(ComunictConst.COMHAND_MANAGER_ORDER_SEND, 0,
dataAnalys.getConversationId(), dataAnalys
.getSerialNum(), handSend);// 封装报头
try {
sendData(handSend);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (countRead < 1) {
List<CommData> hands = readCush(ComunictConst.COMHAND_MANAGER_ORDER_RECIVE);
if (hands.get(0) == null) {// 判断是否有服务器回应
outTimes++;// 没有的话超时次数累加
} else {
outTimes = 0;// 若有返回数据则超时次数清零
}
}
if (outTimes > 10) {// 超过10次也就是30秒无响应则重新初始化每个3秒初始化一次
try {
initialize(inetSocketAddress);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}, 0, 3000);
}
}
java.nio(socket异步通讯完整版)
3星 · 超过75%的资源 需积分: 50 111 浏览量
2010-05-20
16:34:38
上传
评论 7
收藏 17KB RAR 举报
winbps
- 粉丝: 31
- 资源: 8
最新资源
- C语言基础-C语言编程基础之Leetcode编程题解之第39题组合总和.zip
- C语言基础-C语言编程基础之Leetcode编程题解之第38题外观数列.zip
- C语言基础-C语言编程基础之Leetcode编程题解之第37题解数独.zip
- C语言基础-C语言编程基础之Leetcode编程题解之第36题有效的数独.zip
- C语言基础-C语言编程基础之Leetcode编程题解之第35题搜索插入位置.zip
- index.wxml
- C语言基础-C语言编程基础之Leetcode编程题解之第33题搜索旋转排序数组.zip
- 基于Python实现的手写数字识别系统源码.zip
- 从网页提取禁止转载的文字
- C语言基础-C语言编程基础之Leetcode编程题解之第32题最长有效括号.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
- 1
- 2
- 3
- 4
- 5
- 6
前往页