package org.byron4j.cookbook.zk.chapter4;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* 建立自己的监视点(Watcher)
*/
@Slf4j
public class Master6 implements Watcher {
static ZooKeeper zk;
static String hostPort;
static String serverId = Integer.toHexString(new Random().nextInt());
static boolean isLeader = false;
static MasterStates states;
static List<String> workerList;
// 创建 getData 回调方法对象
static AsyncCallback.DataCallback masterCheckCallback = new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
checkMaster();
return;
case NONODE:
runForMaster();
return;
}
}
};
// 创建主从模型目录的回调方法对象
static AsyncCallback.StringCallback createParentCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)){
case OK:
log.info("Parent " + path + " created");
break;
case CONNECTIONLOSS:
createParent(path, (byte[]) ctx);
break;
case NODEEXISTS:
log.warn("Parent already registered: " + path);
break;
default:
log.error("Something went wrong: ", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
// /master 是否存在的异步调用函数
static AsyncCallback.StatCallback masterExistsCallback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
existsMaster(); // 连接丢失则需要重新设置监视点
break;
case OK:
if( stat == null ){ // 在 create 回调函数执行和 exists 操作执行期间发生了 /master 节点被删除的情况,则如果返回了OK,还需要检查stat对象是否为空,为空则表示已经被删除了
states = MasterStates.RUNNING;
runForMaster();
}
break;
default:
checkMaster();
break;
}
}
};
// 创建create回调方法对象
static AsyncCallback.StringCallback masterCreateCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
checkMaster();
return;
case OK:
isLeader = true;
break;
default:
isLeader = false;
}
System.out.println("I'm " + (isLeader ? "" : "not ") + "the leader");
}
};
/**
* 初始化时,创建主从模式的三个目录
*/
static void bootstrap(){
createParent("/workers", new byte[0]);
createParent("/tasks", new byte[0]);
createParent("/assign", new byte[0]);
}
Master6(String hostPort){
this.hostPort = hostPort;
}
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeDeleted){
// 节点被删除事件
assert "/master".equals(event.getPath());
runForMaster();
}
}
void startZk() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this);
}
void stopZk() throws InterruptedException {
zk.close();
}
static void runForMaster() {
zk.create("/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, masterCreateCallback, null);
}
/**
* 如果存在了 /master 返回true
* @return
*/
static void checkMaster(){
zk.getData("/master",false, masterCheckCallback, null);
}
/**
* 创建 /workers、/tasks、/assign
* @param path
* @param data
*/
static void createParent(String path, byte[] data){
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data);
}
static Watcher masterExistsWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeDeleted ){
// 节点已删除
log.info("/master is equals to path: " + "/master".equals(event.getPath()));
runForMaster();
}
}
};
static void existsMaster(){
zk.exists("/master", masterExistsWatcher, masterExistsCallback, null);
}
/***********************************主节点等待从节点列表的变化START****************************************/
static Watcher workerChangeWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if( event.getType() == Event.EventType.NodeChildrenChanged ){
// 子节点变更事件
log.info( "Is /workers's children znode changed: " + "/workers".equalsIgnoreCase(event.getPath()));
getWorkers();
}
}
};
/**
* 获取子节点并设置监视点
*/
static void getWorkers(){
zk.getChildren("/workers", workerChangeWatcher, workersGetChildrenCallback, null);
}
// 子节点变更回调对象
static AsyncCallback.ChildrenCallback workersGetChildrenCallback = new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
// 连接丢失,重新获取子节点并设置监视点
getWorkers();
break;
case OK:
// OK
// TODO 重新分配崩溃的节点,并重新设置监视点
reassignAndSet(children);
workerList = children;
default:
log.error("getChildren failed", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
// 用于保存上次获得的节点列表的本地缓存
static ChildrenCache childrenCache;
/**
* 重新分配崩溃的节点,并重新设置监视点
*/
static void reassignAndSet(List<String> children){
List<String> process = null;
if( childrenCache == null ){
childrenCache = ChildrenCache.builder().data(children).build();
}else{
log.info("Removing and setting");
process = childrenCache.removeAndSet(children);
}
if( process != null ){
for( String worker : process ){
//TODO getAbsentWorkerTasks(worker);
}
}
}
@Setter
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
该项目是一款基于Java语言的CookBook设计源码集合,包含685个文件,涵盖229个Java源文件、213个PNG图片文件、159个Markdown文档、26个JPG图片文件,以及少量其他类型文件,如XML、JAR、属性文件等。该集合旨在提供丰富的Java编程资源和Shell脚本示例,适用于需要Java和Shell脚本编程技能的开发者。
资源推荐
资源详情
资源评论
收起资源包目录
基于Java语言的CookBook设计源码与Shell脚本示例集合 (703个子文件)
application.yml.back 301B
application.properties.back 254B
CountMojo.class 4KB
CrfBankcardOptRecord.class 4KB
CrfLoanClaimInfo.class 2KB
Javassist2ClassPool.class 1KB
Demo.class 534B
CrfBankcardOptRecordMapper.class 459B
CrfLoanClaimInfoMapper.class 406B
Rectangle.class 361B
Point.class 302B
AAA.class 254B
mvnw.cmd 6KB
nginx.conf 4KB
keepalived.conf 2KB
emp.data 81B
org.byron4j.cookbook.javacore.spi.DefaultView 45B
.DS_Store 6KB
.DS_Store 6KB
.DS_Store 6KB
dlzd-qrcode.gif 198KB
.gitignore 314B
sqlite-jdbc-3.14.2.1.jar 4.12MB
mysql-connector-java-5.1.27.jar 852KB
mybatis-generator-core-1.3.2.jar 504KB
maven-wrapper.jar 50KB
mvn-local-plugin-0.0.1-SNAPSHOT.jar 6KB
Master6.java 13KB
Master5.java 6KB
Master7.java 5KB
MavenWrapperDownloader.java 5KB
SingletonTest.java 4KB
Master4.java 4KB
MessageClient.java 4KB
PacketCodeC.java 4KB
CountMojo.java 4KB
ByteBufTest.java 4KB
Javassist1RWBytecode.java 4KB
MQTransactionProducerDemo.java 4KB
Worker.java 3KB
ObjectPool.java 3KB
MQTransactionConsumerDemo.java 3KB
TestCase.java 3KB
MessageServerHandler.java 3KB
Master3.java 3KB
MockitoAnnoTest.java 3KB
Javassist2ClassPool.java 3KB
JdbcTemplateDao.java 2KB
MQConsumerDemo.java 2KB
BinaryTree.java 2KB
Master2.java 2KB
MessageClientHandler.java 2KB
AspectjMain.java 2KB
MockitoTest.java 2KB
ClientHandler.java 2KB
ServerHandler.java 2KB
NettyClient.java 2KB
HelloController.java 2KB
UiApplication.java 2KB
MQProducerDemo.java 2KB
ExecutorDemo.java 2KB
TimerServer.java 2KB
AdminClient.java 2KB
BasicHttpServerExample2.java 2KB
QuickSort.java 2KB
SampleLoader.java 2KB
NettyServer.java 2KB
IOServer.java 2KB
NamedParameterJdbcTemplateDao.java 2KB
MessageServer.java 2KB
HelloClassFileTransformer.java 2KB
ObserverTest.java 1KB
SingletonZCloneSerializableReadResolve.java 1KB
C3P0Utils.java 1KB
MinorGCDemo.java 1KB
JDBCConnectionPool.java 1KB
VolatileDemo.java 1KB
JdbcTemplateQuery.java 1KB
MyInterceptor.java 1KB
JDKDynamicProxy.java 1KB
Client.java 1KB
CglibProxy.java 1KB
Publisher2WeakReference.java 1KB
AccountAdapter.java 1KB
Master.java 1KB
Herb.java 1KB
SingletonZCloneSerializable.java 1KB
Utils.java 1KB
PayrollDayStrategy.java 1KB
OffsetItemEnums.java 1KB
Publisher.java 1KB
SingletonZCloneUnabled.java 1KB
ResttemplateTest.java 1KB
Javassist3ClassLoaderTest.java 1KB
SingletonZClone.java 1KB
ReferenceCountGCDemo.java 1KB
Server.java 1KB
Client.java 1KB
Ch01.java 1KB
Meal.java 1KB
共 703 条
- 1
- 2
- 3
- 4
- 5
- 6
- 8
资源评论
csbysj2020
- 粉丝: 2608
- 资源: 5497
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功