package whu.edu.lcrpc.configserver.thread;
/**
* Created by apple on 17/4/20.
*/
import lombok.Data;
import whu.edu.lcrpc.configserver.data.DataSingleClass;
import whu.edu.lcrpc.configserver.entity.requestdata.*;
import whu.edu.lcrpc.configserver.entity.responsedata.HeartBeatResponseDO;
import whu.edu.lcrpc.configserver.entity.responsedata.ResponseDataDO;
import whu.edu.lcrpc.configserver.entity.responsedata.ServiceIPsDO;
import whu.edu.lcrpc.configserver.entity.responsedata.ServiceRegistryResultDO;
import whu.edu.lcrpc.configserver.service.ISendData;
import whu.edu.lcrpc.configserver.service.IUpdateData;
import whu.edu.lcrpc.configserver.service.impl.SendDataImpl;
import whu.edu.lcrpc.configserver.service.impl.UpdateDataImpl;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.UUID;
/**
* 处理socket请求连接线程
*/
@Data
public class DealWithConnectionThread implements Runnable{
private ISendData sendData = null;
private IUpdateData updateData = null;
private Socket socket = null;//长连接发送的数据包DO
private DataSingleClass dataSingle = null;//软负载中心维护的三类数据的访问接口
public DealWithConnectionThread(Socket socket){
this.socket = socket;
this.dataSingle = DataSingleClass.getData();
this.sendData = new SendDataImpl();
this.updateData = new UpdateDataImpl();
}
@Override
public void run() {
//获取输入,序列化后,判断数据包类型,针对不同类型进行不同的处理
/**
* 发送数据包的类型
* 0:服务注册数据包: 更新"聚合数据",并根据订阅关系,推送更新
* 1:服务发布端发送的心跳包: 更新该服务该地址的最后更新时间,即更新"聚合数据"
* 2:服务调用端发送的心跳包
* 3:服务信息查询数据包: 更新"订阅关系",并推送服务地址列表
*/
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
boolean flag = true;
try {
ois = new ObjectInputStream(socket.getInputStream());
oos = new ObjectOutputStream(socket.getOutputStream());
while (flag){
ReqeustDataDO reqeustDataDO = (ReqeustDataDO) ois.readObject();
ResponseDataDO responseDataDO = null;
//根据type值判断数据包的类型
switch (reqeustDataDO.type){
case 0:
responseDataDO = serviceRegistry((ServiceRegistryDO) reqeustDataDO);
break;
case 1:
responseDataDO = serviceProHeartbeat((ServiceProHeartbeatDO) reqeustDataDO);
break;
case 2:
responseDataDO = new HeartBeatResponseDO();
break;
case 3:
responseDataDO = serviceInfoQuery((ServiceQueryDO) reqeustDataDO,socket,oos);
break;
default:break;
}
//发送响应数据
oos.writeObject(responseDataDO);
}
} catch (IOException e) {
e.printStackTrace();
flag = false;
} catch (ClassNotFoundException e) {
e.printStackTrace();
flag = false;
}finally {
try {
if (ois != null)ois.close();
if (oos != null)oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 收到服务注册请求,更新"聚合数据"
* 若该服务存在
* @param serviceRegistryDO
*/
private ResponseDataDO serviceRegistry(ServiceRegistryDO serviceRegistryDO){
ServiceRegistryResultDO registryResultDO = new ServiceRegistryResultDO();
//step1.更新"聚合数据"
String serviceID = serviceRegistryDO.getInterfaceName() + "_" + serviceRegistryDO.getVersion();
if (updateData.addServiceData(serviceRegistryDO.getInterfaceName(),serviceRegistryDO.getVersion(),serviceRegistryDO.getImplClassName(),serviceRegistryDO.getIp())){
//服务注册成功
System.out.println("[" + serviceID + "] 服务注册成功");
//step2. 判断该服务是否有订阅的客户端,若有,则发送该服务信息到所有订阅者
if (dataSingle.subscriptionData.containsKey(serviceID)){
sendData.sendServiceAddrToSubscriber(serviceID);
}
//step3. 增加连接
// String connID = socket.getInetAddress().getHostAddress();
// updateData.addConnection(connID,socket);
// System.out.println("增加一个服务发布者连接[" + connID + "]");
}else {
//服务注册失败
System.out.println("[" + serviceID + "] 服务注册失败");
registryResultDO.flag = false;
}
return registryResultDO;
}
private ResponseDataDO serviceProHeartbeat(ServiceProHeartbeatDO serviceProHeartbeatDO) {
HeartBeatResponseDO heartBeatResponseDO = new HeartBeatResponseDO();
//更新该服务该地址的 心跳包接收时间
String serviceID = serviceProHeartbeatDO.getInterfaceName() + "_" + serviceProHeartbeatDO.getVersion();
// System.out.println("收到服务[" + serviceID + "]发来的心跳包");
heartBeatResponseDO.flag = (updateData.updateServiceAddrUpdateTime(serviceID,serviceProHeartbeatDO.getIp()));
return heartBeatResponseDO;
}
private ResponseDataDO serviceInfoQuery(ServiceQueryDO serviceQueryDO, Socket socket,ObjectOutputStream oos){
ServiceIPsDO serviceIPsDO = new ServiceIPsDO();
serviceIPsDO.setServiceID(serviceQueryDO.getServiceID());
String serviceID = serviceQueryDO.getServiceID();
System.out.println("查询服务[" + serviceID + "]地址列表");
//step1. 查询该服务的地址列表
if (!dataSingle.serviceData.containsKey(serviceID)){
System.out.println("服务[" + serviceID + "不存在");
serviceIPsDO.flag = false;
}else {
//step2. 增加连接数据
String connID = socket.getInetAddress().getHostAddress() + "_" + UUID.randomUUID().toString();
updateData.addConnection(connID,socket,oos);
System.out.println("增加一个服务调用者连接[" + connID + "]");
//step3. 更新订阅关系
updateData.addSubscription(serviceID,connID);
System.out.println("服务[" + serviceID + "]增加一个订阅者[" + connID + "]");
//step4. 获取地址列表
serviceIPsDO.setIps(dataSingle.serviceData.get(serviceID).getIps().keySet());
}
return serviceIPsDO;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
实现一个简单的软负载中心
共98个文件
java:30个
class:30个
xml:22个
5星 · 超过95%的资源 需积分: 10 16 下载量 39 浏览量
2017-04-27
09:49:13
上传
评论 1
收藏 145KB ZIP 举报
温馨提示
具体参考博客《【远程调用框架】如何实现一个简单的RPC框架(五)优化三:软负载中心设计与实现》http://blog.csdn.net/u013177446/article/details/70677800
资源推荐
资源详情
资源评论
收起资源包目录
MyConfigServer.zip (98个子文件)
MyConfigServer
configserver-util
pom.xml 879B
target
maven-status
maven-compiler-plugin
compile
default-compile
inputFiles.lst 2KB
createdFiles.lst 835B
configserver-util-1.0-SNAPSHOT.jar 14KB
classes
whu
edu
lcrpc
configserver
exception
NoServiceFoundException.class 820B
App.class 567B
Constant.class 476B
entity
responsedata
ServiceIPsDO.class 2KB
HeartBeatResponseDO.class 973B
ResponseDataDO.class 446B
ServiceRegistryResultDO.class 989B
ServiceInfoDO.class 4KB
requestdata
ReqeustDataDO.class 403B
ServiceProHeartbeatDO.class 2KB
ServiceRegistryDO.class 3KB
ServiceQueryDO.class 2KB
ServiceConHeartbeatDO.class 977B
maven-archiver
pom.properties 135B
generated-sources
annotations
src
main
java
whu
edu
lcrpc
configserver
exception
NoServiceFoundException.java 332B
App.java 189B
Constant.java 274B
entity
responsedata
ServiceRegistryResultDO.java 342B
ResponseDataDO.java 423B
HeartBeatResponseDO.java 359B
ServiceIPsDO.java 403B
ServiceInfoDO.java 681B
requestdata
ReqeustDataDO.java 470B
ServiceProHeartbeatDO.java 405B
ServiceConHeartbeatDO.java 251B
ServiceQueryDO.java 267B
ServiceRegistryDO.java 454B
configserver-util.iml 870B
pom.xml 2KB
configserver-client
pom.xml 963B
target
maven-status
maven-compiler-plugin
compile
default-compile
inputFiles.lst 933B
createdFiles.lst 429B
configserver-client-1.0-SNAPSHOT.jar 12KB
classes
whu
edu
lcrpc
configserver
ServiceIPInfo.class 2KB
thread
ServiceProviderHeartBeatThread.class 2KB
ServiceConsumerHeartBeatThread.class 3KB
IServiceConsumerClient.class 481B
IServiceProviderClient.class 262B
impl
ServiceProviderClientImpl.class 4KB
ServiceConsumerClientImpl.class 5KB
maven-archiver
pom.properties 137B
generated-sources
annotations
configserver-client.iml 935B
src
main
java
whu
edu
lcrpc
configserver
thread
ServiceConsumerHeartBeatThread.java 2KB
ServiceProviderHeartBeatThread.java 2KB
IServiceConsumerClient.java 617B
ServiceIPInfo.java 1KB
IServiceProviderClient.java 413B
impl
ServiceConsumerClientImpl.java 3KB
ServiceProviderClientImpl.java 3KB
MyConfigServer.iml 773B
.idea
uiDesigner.xml 9KB
misc.xml 9KB
copyright
profiles_settings.xml 74B
modules.xml 739B
encodings.xml 440B
compiler.xml 1KB
workspace.xml 94KB
libraries
Maven__org_springframework_spring_web_4_3_3_RELEASE.xml 636B
Maven__org_springframework_spring_context_4_3_3_RELEASE.xml 664B
Maven__org_springframework_spring_expression_4_3_3_RELEASE.xml 685B
Maven__commons_logging_commons_logging_1_2.xml 585B
Maven__org_springframework_spring_core_4_3_3_RELEASE.xml 643B
Maven__org_springframework_spring_webmvc_4_3_3_RELEASE.xml 657B
Maven__org_springframework_spring_beans_4_3_3_RELEASE.xml 650B
Maven__org_projectlombok_lombok_1_16_10.xml 558B
Maven__org_springframework_spring_aop_4_3_3_RELEASE.xml 636B
configerver-server
pom.xml 2KB
target
configserver-server-1.0-SNAPSHOT.jar 17KB
maven-status
maven-compiler-plugin
compile
default-compile
inputFiles.lst 1KB
createdFiles.lst 583B
classes
whu
edu
lcrpc
configserver
StartServer.class 776B
thread
WaitForConnectionThread.class 1KB
CheckServiceAddrThread.class 4KB
DealWithConnectionThread.class 9KB
DeletClosedConnectionThread.class 2KB
data
DataSingleClass.class 1KB
service
ISendData.class 196B
IUpdateData.class 482B
impl
UpdateDataImpl.class 3KB
SendDataImpl.class 3KB
applicationContext.xml 518B
maven-archiver
pom.properties 137B
generated-sources
annotations
src
main
resources
applicationContext.xml 518B
java
whu
edu
lcrpc
configserver
thread
DealWithConnectionThread.java 7KB
CheckServiceAddrThread.java 2KB
DeletClosedConnectionThread.java 1KB
WaitForConnectionThread.java 862B
StartServer.java 910B
data
DataSingleClass.java 2KB
service
ISendData.java 316B
IUpdateData.java 1022B
impl
UpdateDataImpl.java 2KB
SendDataImpl.java 2KB
configerver-server.iml 2KB
共 98 条
- 1
资源评论
- BradXue2017-09-25资源质量没有看,语言是Java 语言的。
吃货小跟班
- 粉丝: 64
- 资源: 4
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功