package com.zhihao.netty.rpc.clientconsumer.consumer;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.zhihao.netty.rpc.clientconsumer.consumer.common.NettyConsumerContext;
import com.zhihao.netty.rpc.clientconsumer.consumer.handler.MyChannelInitializer;
import com.zhihao.netty.rpc.entity.InvokeResult;
import com.zhihao.netty.rpc.entity.MessageProtocol;
import com.zhihao.netty.rpc.entity.RegistrationInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Author: ZhiHao
* @Date: 2022/6/29 17:37
* @Description:
* @Versions 1.0
**/
@Slf4j
public class NettyClient {
private String ip;
private Integer port;
private String serviceName;
private Map<String, ChannelHandlerContext> channelHandlerContextMap;
private Map<String, SynchronousQueue<?>> resultConcurrentHashMap;
public NettyClient(RegistrationInfo info) {
if (StrUtil.isBlank(info.getIp())) {
throw new RuntimeException("提供者注册信息有误!!!");
}
this.ip = info.getIp();
this.port = info.getPort();
this.serviceName = info.getServiceName();
this.channelHandlerContextMap = NettyConsumerContext.channelHandlerContextMap;
this.resultConcurrentHashMap = NettyConsumerContext.resultConcurrentHashMap;
}
public static NettyClient buildClient(RegistrationInfo info) {
NettyClient nettyClient = new NettyClient(info);
nettyClient.initNettyClient();
boolean flag = nettyClient.channelHandlerContextMap.containsKey(nettyClient.serviceName);
log.info("initNettyClient===serviceName:{}, 初始化客户端结果:{}", nettyClient.serviceName, flag);
return flag ? nettyClient : null;
}
public Object doRemotelyInvoke(Method method, Object[] args) throws TimeoutException {
ChannelHandlerContext channelHandlerContext = channelHandlerContextMap.get(serviceName);
if (null == channelHandlerContextMap) {
throw new RuntimeException("服务提供者已不存在!!!");
}
MessageProtocol protocol = new MessageProtocol();
String requestId = UUID.fastUUID().toString() + this.getCurrentTimeSecond();
String simpleName = method.getDeclaringClass().getSimpleName();
protocol.setClassName(StrUtil.lowerFirst(simpleName));
protocol.setMethodName(method.getName());
protocol.setMethodParameterTypes(method.getParameterTypes());
protocol.setMethodParameter(args);
protocol.setRequestId(requestId);
// 使用同步阻塞队列, 阻塞等待, 别的线程插入结果, 有结果线程在继续
SynchronousQueue<?> synchronousQueue = new SynchronousQueue<>();
// 以UUID+时间戳作为唯一key
resultConcurrentHashMap.put(requestId, synchronousQueue);
String jsonStr = JSONUtil.toJsonStr(protocol);
channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes(StandardCharsets.UTF_8)));
log.info("doRemotelyInvoke===已经发送, jsonStr:{}!!!!", jsonStr);
Class<?> returnType = method.getReturnType();
InvokeResult take = null;
long second;
try {
second = this.getCurrentTimeSecond();
take = (InvokeResult) synchronousQueue.poll(30L, TimeUnit.SECONDS);
second = this.getCurrentTimeSecond() - second;
} catch (Exception e) {
throw new RuntimeException(e);
}
if (second >= 30L) {
resultConcurrentHashMap.remove(requestId);
throw new TimeoutException("调用超时!!!");
}
Integer resultCode = Optional.ofNullable(take).map(InvokeResult::getResultCode).orElse(-1);
Object result = null;
if (resultCode.equals(1)) {
throw new RuntimeException(take.getFailMessage());
} else {
String resultStr = Optional.ofNullable(take).map(InvokeResult::getInvokeResult)
.map(Objects::toString).orElse("");
if (JSONUtil.isJsonArray(resultStr)) {
result = JSONUtil.toList(resultStr, returnType);
} else if (JSONUtil.isJsonObj(resultStr)) {
result = JSONUtil.toBean(resultStr, returnType);
} else if (ClassUtil.isBasicType(returnType)) {
result = resultStr;
} else {
result = resultStr;
}
}
return result;
}
private void initNettyClient() {
// 以常量池中的服务名称作为锁对象
synchronized (serviceName.intern()) {
try {
// 不包含则初始化
if (!channelHandlerContextMap.containsKey(serviceName)) {
new Thread(() -> {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.option(ChannelOption.SO_TIMEOUT, 3)
.handler(new MyChannelInitializer(serviceName));
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(ip, port));
channelFuture.addListener(el -> {
if (el.isSuccess()) {
log.info("initNettyClient===serviceName:{}, 连接成功!!!", serviceName);
} else {
log.error("initNettyClient===serviceName:{}, 连接失败!!!", serviceName);
}
});
try {
channelFuture.sync().channel().closeFuture().sync();
} catch (Throwable e) {
log.error("initNettyClient===serviceName:{}, 发生异常, 信息:{}", serviceName, e);
} finally {
workerGroup.shutdownGracefully();
}
}).start();
}
log.info("initNettyClient===serviceName:{}, 开始等待初始化客户端完成", serviceName);
// 以常量池中不可变引用服务字符串
serviceName.intern().wait(5000L);
} catch (Exception e) {
log.error("initNettyClient===serviceName:{}, 初始化客户端发生异常, 信息:{}", serviceName, e);
}
}
}
private long getCurrentTimeSecond() {
return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源详情
资源评论
资源推荐
收起资源包目录
自己学习netty的笔记和Demo (163个子文件)
.gitignore 395B
nettyDemo.iml 12KB
NettyClient.java 8KB
NIOSelectorServiceUnfinishedDemo.java 7KB
NIOSelectorServiceDemo.java 6KB
GroupChatNIOService.java 5KB
NIOSelectorClientDemo.java 5KB
GetRegistrationInfo.java 5KB
GroupChatNIOClient1.java 4KB
GroupChatNIOClient2.java 4KB
GroupChatNIOClient3.java 4KB
GroupChatServerChannelHandler.java 4KB
RetryStrategy.java 4KB
WebSocketServer.java 4KB
NettyHttpServerHandler.java 4KB
ProvideChannelHandler.java 4KB
HeartbeatServer.java 4KB
MyRegisteredChannelHandler.java 3KB
NettyServerHandler.java 3KB
NettyService.java 3KB
NettySolveTcpService.java 3KB
NotBlockNIOService.java 3KB
NettyClient.java 3KB
GroupChatClient.java 3KB
NettyGroupServiceTwo.java 3KB
ServiceRegistration.java 3KB
NettyRetryClientTwo.java 3KB
MyConsumerHandler.java 3KB
NettySolveTcpClient.java 3KB
HeartbeatReportClient.java 3KB
NettyRetryClient.java 3KB
HeartbeatServerHandle.java 3KB
HeartbeatReportServer.java 2KB
GroupChatServer.java 2KB
MyProvide.java 2KB
NettyGroupServiceHandle.java 2KB
MyLenghtFieldBasedFrameDecode.java 2KB
NettyTcpService.java 2KB
HeartbeatClient.java 2KB
NettyHttpClientHandler.java 2KB
BlockNIOService.java 2KB
MyGetRegistrationHandler.java 2KB
NettyClientHandler.java 2KB
MyRegistrationCenter.java 2KB
NettyGroupService.java 2KB
MyProduceRegistrationHandler.java 2KB
NettyInvocationHandler.java 2KB
NettyGroupClient.java 2KB
NotBlockNIOClient1.java 2KB
BlockNIOClient1.java 2KB
NettyTcpClient.java 2KB
MyWebSocketServerMessageHandler.java 2KB
HttpPipelineInitializer.java 2KB
HeartbeatClientReportHandle.java 1KB
ProvideChannelInitializer.java 1KB
MyChannelInitializer.java 1KB
ProvideStart.java 1KB
NettyGroupClientHandle.java 1KB
NettyHttpService.java 1KB
NettySolveTcpClientHandle.java 1KB
NettyGroupServiceHandleTwo.java 1KB
RegistrationFuture.java 1KB
NettySolveTcpServiceHandle.java 1KB
NettyHttpClient.java 1KB
MyRegisteredChannelInitializer.java 1KB
NettyTcpServiceHandle.java 1KB
MyMessageProtocolEncoder.java 1KB
NettyTcpClientHandle.java 1KB
ConsumerStart.java 984B
NettyRetryClientHandle.java 922B
GroupChatClientChannelHandler.java 893B
HeartbeatClientHandle.java 886B
NettyConsumerContext.java 802B
InvokeResult.java 784B
ConsumerFactory.java 758B
HeartbeatReportServerHandle.java 719B
MessageProtocol.java 674B
RegistrationInfo.java 525B
NettyProduceContext.java 500B
MyRPCTestImpl.java 487B
MessageProtocol.java 455B
NettyDemoApplication.java 321B
NettyDemoApplicationTests.java 286B
MyRPCTest.java 215B
MyRPCTest.java 215B
Netty网络框架学习笔记-1(NIO知识_2022-01-20).md 50KB
Netty网络框架学习笔记-20(实现一个简单RPC-2).md 37KB
Netty网络框架学习笔记-18(NioEventLoop源码分析).md 28KB
Netty网络框架学习笔记-3(Netty入门_2022-02-17).md 21KB
Netty网络框架学习笔记-4(Netty核心知识_2022-02-21).md 20KB
Netty网络框架学习笔记-11(TCP 粘包和拆包).md 18KB
Netty网络框架学习笔记-12(Netty核心源码剖析-启动).md 17KB
Netty网络框架学习笔记-17(客户端断线重连) .md 17KB
Netty网络框架学习笔记-5(Netty入门HTTP服务_2022-02-21).md 16KB
Netty网络框架学习笔记-7((心跳)检测空闲连接以及超时).md 15KB
Netty网络框架学习笔记-6(Netty简单实现一个群聊_2022-04-14).md 15KB
Netty网络框架学习笔记-2(NIO实现一个群聊_2022-01-26).md 15KB
Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析).md 12KB
Netty网络框架学习笔记-8(WebSocket实现服务器和客户端长连接).md 12KB
Netty网络框架学习笔记-9(编码与解码器机制).md 9KB
共 163 条
- 1
- 2
懵懵懂懂程序员
- 粉丝: 3494
- 资源: 2
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0