基于基于ZooKeeper实现队列源码实现队列源码
主要介绍了基于ZooKeeper实现队列源码的相关内容,包括其实现原理和应用场景,以及对队列的简单介绍,
具有一定参考价值,需要的朋友可以了解下。
实现原理实现原理
先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节
点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存
储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点
是持久化的,所以不必担心队列消息的丢失问题。
队列(队列(Queue))
分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue
node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序
(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X
是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除
也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小
的节点中取数据)。
应用场景应用场景
Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要
小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。
详细代码详细代码
ZookeeperClient工具类
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by Massive on 2016/12/18.
*/
public class ZooKeeperClient {
private static String connectionString = "localhost:2181";
private static int sessionTimeout = 10000;
public static ZooKeeper getInstance() throws IOException, InterruptedException {
//--------------------------------------------------------------
// 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
// 这里等Zookeeper的连接完成才返回实例
//--------------------------------------------------------------
final CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
} else if (event.getState() == Event.KeeperState.Expired) {
}
}
});
connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
return zk;
}
public static int getSessionTimeout() {
return sessionTimeout;
}
public static void setSessionTimeout(int sessionTimeout) {
ZooKeeperClient.sessionTimeout = sessionTimeout;
}
}
ZooKeeperQueue
package org.massive.queue;
import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
评论0
最新资源