package persistent.prestige.demo.netty.dubbo.net;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import persistent.prestige.demo.netty.dubbo.net.heart.HeartExecutorThreadPool;
import persistent.prestige.demo.netty.dubbo.net.heart.HeartTask;
import persistent.prestige.demo.netty.dubbo.utils.LogUtils;
/**
* netty 客户端 connection 一个NettyConnection 就是一条长连接
* @author prestigeding@126.com
*
*/
public class NettyClientConnection {
private String serverIp;
private int serverPort;
private boolean connect = false;
private Bootstrap bootstrap;
private NioEventLoopGroup workGroup;
private ChannelFuture channelFuture;
//客户端连接请求
private static final ConcurrentHashMap<Integer, ResponseFuture> requestMap = new ConcurrentHashMap<Integer, ResponseFuture>();
private static final AtomicInteger seq = new AtomicInteger(0);
private HeartExecutorThreadPool heartExecutor = null;
/**
*
* @param serverIp
* @param serverPort
*/
public NettyClientConnection(String serverIp, int serverPort) {
this.serverIp = serverIp;
this.serverPort = serverPort;
workGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// TODO Auto-generated method stub
ch.pipeline().addLast(new ResponseFrameDecoder())
.addLast(new RequestFrameEncoder())
.addLast(new DispatchHandler());
}
});
}
private void connect() {
if(!connect) {
synchronized (this) {
try {
if(!connect) {
channelFuture = bootstrap.connect(serverIp, serverPort).sync();
heartExecutor = new HeartExecutorThreadPool(0, this, 3);
connect = true;
heartExecutor.scheduleAtFixedRate(new HeartTask(this), 1, 3, TimeUnit.SECONDS);//单位为秒
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new NullPointerException("连接服务端失败");
}
}
}
}
/**
* 发送服务调用
* @param service
* @param params
* @return
*/
public ResponseFuture sendMsg(String service, String params) {
Integer requestId = seq.incrementAndGet();
System.out.println("客户端requestId:" + requestId);
return send(new Request(requestId, (short)1, service, params));
}
/**
* 发送心跳包
* @return
*/
public ResponseFuture sendHeatMsg() {
Integer requestId = seq.incrementAndGet();
return send(new Request(requestId, (short)2));
}
private ResponseFuture send(Request request) {
connect();
ResponseFuture responseFuture = new ResponseFuture();
requestMap.putIfAbsent(request.getRequestId(), responseFuture);
channelFuture.channel().writeAndFlush(request);
return responseFuture;
}
/**
* 关闭连接
*/
public void close() {
try {
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
workGroup.shutdownGracefully();
if(heartExecutor != null) {
heartExecutor.shutdownNow();
}
}
}
/**
* 客户端命令转发器
* @author prestigeding@126.com
*
*/
private class DispatchHandler extends ChannelHandlerAdapter {
/**
* 得到服务端的响应后,放入结果中,供客户端使用结果
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
LogUtils.log("客户端收到服务端响应,并开始处理数据");
Response response = (Response)msg;
LogUtils.log("服务端响应requestId:", response.getRequestId());
ResponseFuture f = requestMap.remove(response.getRequestId());
if(f != null) {
f.setResult(response);
} else {
LogUtils.log("警告:客户端丢失请求信息");
}
}
}
}
评论6