没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
试读
21页
本文深入探讨了Zookeeper的关键应用和集群特性,涉及官方客户端的使用、Apache Curator客户端框架的应用,以及Zookeeper集群的不停机动态扩容和缩容。通过实际代码示例,详细说明了Zookeeper客户端实例的创建、节点的同步与异步创建、修改节点数据,以及Curator框架的应用实践。此外,还介绍了Zookeeper在分布式环境中的经典应用场景,包括分布式锁和集群模式的特点。文章以其深入浅出的讲解和实用的代码示例,适合希望深入了解Zookeeper集群和客户端应用的开发者。
资源推荐
资源详情
资源评论
ZookeeperJava客户端
ApacheCurator开源客户端
Zookeeper集群&不停机动态扩容/缩容
Zookeeper经典应用场景
ZookeeperJava客户端
项目构建
zookeeper官方的客户端没有和服务端代码分离,他们为同一个jar文件,所以我们直接引入
zookeeper的maven即可,这里版本请保持与服务端版本一致,不然会有很多兼容性的问题
1 <dependency>
2 <groupId>org.apache.zookeeper</groupId>
3 <artifactId>zookeeper</artifactId>
4 <version>3.5.8</version>
5 </dependency>
创建客户端实例:
为了便于测试,直接在初始化方法中创建zookeeper实例
1 @Slf4j
2 publicclassZookeeperClientTest{
3
4 privatestaticfinalStringZK_ADDRESS="192.168.109.200:2181";
5
6 privatestaticfinalintSESSION_TIMEOUT=5000;
7
8 privatestaticZooKeeperzooKeeper;
9
10 privatestaticfinalStringZK_NODE="/zk‐node";
11
12
13 @Before
14 publicvoidinit()throwsIOException,InterruptedException{
15 finalCountDownLatchcountDownLatch=newCountDownLatch(1);
16 zooKeeper=newZooKeeper(ZK_ADDRESS,SESSION_TIMEOUT,event‐>{
17 if(event.getState()==Watcher.Event.KeeperState.SyncConnected&&
18 event.getType()==Watcher.Event.EventType.None){
19 countDownLatch.countDown();
20 log.info("连接成功!");
21 }
22 });
23 log.info("连接中....");
24 countDownLatch.await();
25 }
26 }
创建Zookeeper实例的方法:
1 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher)
2 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,ZKClientC
onfig)
3 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,booleanc
anBeReadOnly,HostProvider)
4 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,booleanc
anBeReadOnly,HostProvider,ZKClientConfig)
5 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,booleanc
anBeReadOnly)
6 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,booleanc
anBeReadOnly,ZKClientConfig)
7 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,long,byt
e[])
8 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,long,byt
e[],boolean,HostProvider)
9 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,long,byt
e[],boolean,HostProvider,ZKClientConfig)
10 ZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,long,by
te[],boolean)
参数名称 含义
connectString
ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成,
每一个都代表一台ZooKeeper机器,如,
host1:port1,host2:port2,host3:port3。另外,也可以在connectString中设
置客户端连接上ZooKeeper
后的根目录,方法是在host:port字符串之后添加上这个根目录,例
如,host1:port1,host2:port2,host3:port3/zk-base,这样就指定了该客户端连
接上ZooKeeper服务器之后,所有对ZooKeeper
的操作,都会基于这个根目录。例如,客户端对/sub-node的操作,最终创建
/zk-node/sub-node,这个目录也叫Chroot,即客户端隔离命名空间。
sessionTimeout 会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有
会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳
检
测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效
的心跳检测,会话就会失效。
watcher
ZooKeeper允许
客户端在构造方法中传入一个接口watcher(org.apache.zookeeper.
Watcher)的实现类对象来作为默认的Watcher事件通知处理器。当然,该参
数可以设置为null以表明不需要设置默认的Watcher处理器。
canBeReadOnly
这是一个boolean类型的参数,用于标识当前会话是否支持“read-only(只
读)”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半
及
以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请
求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我
们
还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)——
这就是ZooKeeper的“read-only”模式。
sessionId和ses
sionPasswd
分别代表会话ID和会话秘钥。这两个参数能够唯一确定一个会话,同时客户
端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。具体
使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象
实
例的以下两个接口,即可获得当前会话的ID和秘钥:
longgetSessionId();
byte[]getSessionPasswd();
荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传
入构造方法了
同步创建节点:
1 @Test
2 publicvoidcreateTest()throwsKeeperException,InterruptedException{
3 Stringpath=zooKeeper.create(ZK_NODE,"data".getBytes(),ZooDefs.Ids.OPEN_A
CL_UNSAFE,CreateMode.PERSISTENT);
4 log.info("createdpath:{}",path);
5 }
异步创建节点:
1 @Test
2 publicvoidcreateAsycTest()throwsInterruptedException{
3 zooKeeper.create(ZK_NODE,"data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
4 CreateMode.PERSISTENT,
5 (rc,path,ctx,name)‐>log.info("rc{},path{},ctx{},name
{}",rc,path,ctx,name),"context");
6 TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
7 }
修改节点数据
1 @Test
2 publicvoidsetTest()throwsKeeperException,InterruptedException{
3
4 Statstat=newStat();
5 byte[]data=zooKeeper.getData(ZK_NODE,false,stat);
6 log.info("修改前:{}",newString(data));
7 zooKeeper.setData(ZK_NODE,"changed!".getBytes(),stat.getVersion());
8 byte[]dataAfter=zooKeeper.getData(ZK_NODE,false,stat);
9 log.info("修改后:{}",newString(dataAfter));
10 }
什么是Curator
Curator是一套由netflix公司开源的,Java语言编程的ZooKeeper客户端框架,Curator项目
是现在ZooKeeper客户端中使用最多,对ZooKeeper版本支持最好的第三方客户端,并推荐使
用,Curator把我们平时常用的很多ZooKeeper服务开发功能做了封装,例如Leader选举、
分布式计数器、分布式锁。这就减少了技术人员在使用ZooKeeper时的大部分底层细节开发工
作。在会话重新连接、Watch反复注册、多种异常处理等使用场景中,用原生的ZooKeeper
处理比较复杂。而在使用Curator时,由于其对这些功能都做了高度的封装,使用起来更加简
单,不但减少了开发时间,而且增强了程序的可靠性。
Curator实战
这里我们以Maven工程为例,首先要引入Curator框架相关的开发包,这里为了方便测试引入
了junit,lombok,由于Zookeeper本身以来了log4j日志框架,所以这里可以创建对应的
log4j配置文件后直接使用。如下面的代码所示,我们通过将Curator相关的引用包配置到
Maven工程的pom文件中,将Curaotr框架引用到工程项目里,在配置文件中分别引用了两
个Curator相关的包,第一个是curator-framework包,该包是对ZooKeeper底层API的一
些封装。另一个是curator-recipes包,该包封装了一些ZooKeeper服务的高级特性,如:
Cache事件监听、选举、分布式锁、分布式Barrier。
1 <dependency>
2 <groupId>org.apache.curator</groupId>
3 <artifactId>curator‐recipes</artifactId>
4 <version>5.0.0</version>
5 <exclusions>
6 <exclusion>
7 <groupId>org.apache.zookeeper</groupId>
8 <artifactId>zookeeper</artifactId>
9 </exclusion>
10 </exclusions>
11 </dependency>
12 <dependency>
13 <groupId>org.apache.zookeeper</groupId>
14 <artifactId>zookeeper</artifactId>
15 <version>3.5.8</version>
16 </dependency>
17 <dependency>
18 <groupId>junit</groupId>
19 <artifactId>junit</artifactId>
20 <version>4.13</version>
21 </dependency>
22 <dependency>
23 <groupId>org.projectlombok</groupId>
24 <artifactId>lombok</artifactId>
25 <version>1.18.12</version>
26 </dependency>
会话创建
要进行客户端服务器交互,第一步就要创建会话
Curator提供了多种方式创建会话,比如用静态工厂方式创建:
1 //重试策略
2 RetryPolicyretryPolicy=newExponentialBackoffRetry(1000,3)
3 CuratorFrameworkclient=CuratorFrameworkFactory.newClient(zookeeperConnectio
nString,retryPolicy);
4 client.start();
或者使用fluent风格创建
1 RetryPolicyretryPolicy=newExponentialBackoffRetry(1000,3);
2 CuratorFrameworkclient=CuratorFrameworkFactory.builder()
3 .connectString("192.168.128.129:2181")
4 .sessionTimeoutMs(5000)//会话超时时间
5 .connectionTimeoutMs(5000)//连接超时时间
6 .retryPolicy(retryPolicy)
7 .namespace("base")//包含隔离名称
8 .build();
9 client.start();
剩余20页未读,继续阅读
资源评论
光芒软件工匠
- 粉丝: 789
- 资源: 64
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功