package cn.tojintao.snowflake;
import cn.tojintao.snowflake.common.PropertyFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* @author cjt
* @date 2022/5/30 22:41
*/
@Slf4j
public class SnowflakeZookeeperHolder {
private String zk_AddressNode = null; //保存自身的key ip:port-000000001
private String listenAddress; //保存自身的key ip:port
private int workerID;
private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("spray.name");
private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有数据持久的节点
private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("spray.name") + "/sprayconf/{port}/workerID.properties";
private String ip;
private String port;
private String connectionString;
private long lastUpdateTime;
public SnowflakeZookeeperHolder(String ip, String port, String connectionString) {
this.ip = ip;
this.port = port;
this.listenAddress = ip + ":" + port;
this.connectionString = connectionString;
}
private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
return CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.build();
}
public boolean init() {
try {
CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
curator.start();
Stat stat = curator.checkExists().forPath(PATH_FOREVER);
if (stat == null) {
//不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
zk_AddressNode = createNode(curator);
//在本地缓存workerId,默认是0
updateLocalWorkerID(workerID);
//定时上报本机时间给forever节点
ScheduledUploadData(curator, zk_AddressNode);
return true;
} else {
Map<String, Integer> nodeMap = Maps.newHashMap(); //ip:port->00001
Map<String, String> realMap = Maps.newHashMap(); //ip:port->(ip:port-000001)
//存在根节点,先检查是否有属于自己的根节点
List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
for (String key : keys) {
String[] nodeKey = key.split("-");
realMap.put(nodeKey[0], key);
nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
}
//获取zk上曾经记录的workerId,这里可以看出workerId的分配是依靠zk的自增序列号
Integer worker_id = nodeMap.get(listenAddress);
if (worker_id != null) {
//有自己的节点,zk_AddressNode = /snowflake/${leaf.name}/forever+ip:port-0000001
zk_AddressNode = PATH_FOREVER + "/" + realMap.get(listenAddress);
workerID = worker_id;
//检查该节点当前的系统时间是否在最后一次上报时间之后
if (!checkInitTimeStamp(curator, zk_AddressNode)) {
throw new RuntimeException("init timestamp check error,forever node timestamp gt this node time");
}
//准备创建临时节点
ScheduledUploadData(curator, zk_AddressNode);
updateLocalWorkerID(workerID);
log.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workerID-{} childNode and start SUCCESS", ip, port, workerID);
} else {
//表示新启动的节点,创建持久节点 ,不用check时间
String newNode = createNode(curator);
zk_AddressNode = newNode;
String[] nodeKey = newNode.split("-");
workerID = Integer.parseInt(nodeKey[1]);
ScheduledUploadData(curator, zk_AddressNode);
updateLocalWorkerID(workerID);
log.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workerID-{},create own node on forever node and start SUCCESS ", ip, port, workerID);
}
}
} catch (Exception e) {
log.error("Start node ERROR {}", e);
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
log.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception ex) {
log.error("Read file error ", ex);
return false;
}
}
return true;
}
/**
* 在节点文件系统上缓存一个workerID值,当zk失效,机器重启时保证能够正常启动
*
* @param workerID
*/
private void updateLocalWorkerID(int workerID) {
File confFile = new File(PROP_PATH.replace("{port}", port));
boolean exists = confFile.exists();
log.info("file exists status is {}", exists);
if (exists) {
try {
//更新覆盖workerID的缓存文件
FileUtils.writeStringToFile(confFile, "workerID=" + workerID, false);
log.info("update file cache workerID is {}", workerID);
} catch (IOException e) {
log.error("update file cache error ", e);
}
} else {
//不存在文件,父目录页肯定不存在
try {
boolean mkdirs = confFile.getParentFile().mkdirs();
log.info("init local file cache create parent dis status is {}, workerID is {}", mkdirs, workerID);
if (mkdirs) {
FileUtils.writeStringToFile(confFile, "workerID=" + workerID, false);
log.info("local file cache workerID is {}", workerID);
} else {
log.warn("create parent dir error");
}
} catch (IOException e) {
log.warn("create workerID conf file error", e);
}
}
}
private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "schedule-upload-time");
thread.setDaemon(true);
return thread;
}
}).scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateNewData(curator,
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。 2、适用人群:主要针对计算机相关专业(如计科、信息安全、数据科学与大数据技术、人工智能、通信、物联网、数学、电子信息等)的同学或企业员工下载使用,具有较高的学习借鉴价值。 3、不仅适合小白学习实战练习,也可作为大作业、课程设计、毕设项目、初期项目立项演示等,欢迎下载,互相学习,共同进步!
资源推荐
资源详情
资源评论
收起资源包目录
基于springcloud+Netty+MQ+mysql的分布式即时聊天系统完整源码+数据库+说明.zip (203个子文件)
bootstrap.min.css 118KB
bootstrap.min.css 118KB
layui.css 73KB
layui.css 73KB
chat.css 7KB
chat.css 7KB
.gitignore 278B
.gitignore 227B
friend.html 9KB
friend.html 9KB
group.html 6KB
group.html 6KB
manage.html 5KB
manage.html 5KB
message.html 5KB
message.html 5KB
request.html 4KB
request.html 4KB
chat.html 3KB
chat.html 3KB
index.html 2KB
index.html 2KB
admin_index.html 2KB
admin_index.html 2KB
function.html 1KB
function.html 1KB
SnowflakeZookeeperHolder.java 11KB
ChatServiceImpl.java 10KB
MsgService.java 8KB
FriendServiceImpl.java 7KB
MsgService.java 6KB
NettyServer.java 5KB
NettyServer.java 5KB
AdminServiceImpl.java 5KB
ChatHandler.java 4KB
ChatHandler.java 4KB
SnowflakeIDGenImpl.java 4KB
UserServiceImpl.java 4KB
RedissonUtil.java 4KB
ChatController.java 3KB
FriendController.java 3KB
GroupMessage.java 2KB
Utils.java 2KB
ResultInfo.java 2KB
GateWayFilter.java 2KB
UserController.java 2KB
AdminController.java 2KB
LogAspect.java 2KB
RedisService.java 2KB
TokenUtil.java 2KB
UserInfoServiceFallback.java 2KB
UserInfoServiceFallback.java 2KB
RedisService.java 2KB
DubboUtil.java 2KB
HeartBeatHandler.java 2KB
SnowflakeService.java 2KB
DruidConfig.java 2KB
DruidConfig.java 2KB
Message.java 1KB
HeartBeatHandler.java 1KB
PushController.java 1KB
PushController.java 1KB
CodeEnum.java 1KB
SwaggerConfig.java 1KB
SwaggerConfig.java 1KB
ChatMapper.java 1KB
UserChannelRelation.java 1KB
ConnectorApplication.java 1KB
SentinelGateWayConfig.java 1KB
ConnectorApplication.java 1KB
UserInfoService.java 1KB
UserInfoService.java 1KB
ChatService.java 1KB
DateUtil.java 1KB
UserChannelRelation.java 1KB
SpringUtil.java 1KB
User.java 1KB
YmlConfigUtil.java 1KB
GroupMessageListener.java 1KB
UserInfoService.java 1KB
MessageListener.java 1014B
MessageEntity.java 1001B
ElasticSearchConfig.java 986B
WebMvcConfig.java 946B
WebMvcConfig.java 946B
PushServiceImpl.java 945B
UserSupport.java 902B
BoxVo.java 890B
FriendMapper.java 877B
ChatApplication.java 874B
Admin.java 813B
UserService.java 810B
UserInfoApplication.java 809B
ChatService.java 781B
ChatService.java 781B
FriendService.java 777B
MessageVo.java 774B
AdminMapper.java 752B
ChatServiceFallback.java 713B
AdminService.java 710B
共 203 条
- 1
- 2
- 3
资源评论
龙年行大运
- 粉丝: 1385
- 资源: 3960
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- LabVIEW实现LoRa通信【LabVIEW物联网实战】
- CS-TY4-4WCN-转-公版-XP1-8B4WF-wifi8188
- 计算机网络期末复习资料(课后题答案+往年考试题+复习提纲+知识点总结)
- 从零学习自动驾驶Lattice规划算法(下) 轨迹采样 轨迹评估 碰撞检测 包含matlab代码实现和cpp代码实现,方便对照学习 cpp代码用vs2019编译 依赖qt5.15做可视化 更新:
- 风光储、风光储并网直流微电网simulink仿真模型 系统由光伏发电系统、风力发电系统、混合储能系统(可单独储能系统)、逆变器VSR+大电网构成 光伏系统采用扰动观察法实现mppt控
- (180014016)pycairo-1.18.2-cp35-cp35m-win32.whl.rar
- (180014046)pycairo-1.21.0-cp311-cp311-win32.whl.rar
- DS-7808-HS-HF / DS-7808-HW-E1
- (180014004)pycairo-1.20.0-cp36-cp36m-win32.whl.rar
- (178330212)基于Springboot+VUE的校园图书管理系统
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功