/**
* Copyright: Copyright (c) 2015
*
* @author youaremoon
* @date 2016年6月25日
* @version V1.0
*/
package com.redis;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
/**
* 在集群模式下提供批量操作的功能。 <br/>
* 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),
* 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />
* 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>
* 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />
* 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />
*
* 该类非线程安全
*
* @author youaremoon
* @version
* @since Ver 1.1
*/
public class JedisClusterPipeline extends PipelineBase implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);
// 部分字段没有对应的获取方法,只能采用反射来做
// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
// 我没这样做是因为懒
private static final Field FIELD_CONNECTION_HANDLER;
private static final Field FIELD_CACHE;
static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
}
private JedisSlotBasedConnectionHandler connectionHandler;
private Queue<Client> clients = new LinkedList<Client>(); // 根据顺序存储每个命令对应的Client
private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用于缓存连接
/**
* 根据jedisCluster实例生成对应的JedisClusterPipeline
* @param
* @return
*/
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
JedisClusterPipeline pipeline = new JedisClusterPipeline();
pipeline.setJedisCluster(jedisCluster);
return pipeline;
}
public void setJedisCluster(JedisCluster jedis) {
JedisSlotBasedConnectionHandler ch = getValue(jedis, FIELD_CONNECTION_HANDLER);
if (null == ch) {
throw new RuntimeException("cannot get JedisSlotBasedConnectionHandler from JedisCluster");
}
connectionHandler = ch;
}
/**
* 刷新集群信息,当集群信息发生变更时调用
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
/**
* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
*/
public void sync() {
innerSync(null);
}
/**
* 同步读取所有数据 并按命令顺序返回一个列表
*
* @return 按照命令的顺序返回所有的数据
*/
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<Object>();
innerSync(responseList);
return responseList;
}
private void innerSync(List<Object> formatted) {
HashSet<Client> clientSet = new HashSet<Client>();
try {
for (Client client : clients) {
// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,
// 发生了JedisMovedDataException这样的错误应用是不知道的,
// 因此需要调用get()来触发错误。
// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,
// 然而它并没有提供对应方法,要获取data属性就得用反射,
// 不想再反射了,所以就这样了
Object data = generateResponse(client.getOne()).get();
if (null != formatted) {
formatted.add(data);
}
// size相同说明所有的client都已经添加,就不用再调用add方法了
if (clientSet.size() != jedisMap.size()) {
clientSet.add(client);
}
}
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster();
}
throw jre;
} finally {
if (clientSet.size() != jedisMap.size()) {
// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
for (Jedis jedis : jedisMap.values()) {
if (clientSet.contains(jedis.getClient())) {
continue;
}
try {
jedis.getClient().getAll();
} catch (RuntimeException ex) {
// 其中一个client出问题,后面出问题的几率较大
}
}
}
close();
}
}
@Override
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
jedis.close();
}
jedisMap.clear();
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
@Override
protected Client getClient(byte[] key) {
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
Client client = jedis.getClient();
clients.add(client);
return client;
}
private Jedis getJedis(int slot) {
JedisClusterInfoCache cache = getValue(connectionHandler, FIELD_CACHE);
JedisPool pool = cache.getSlotPool(slot);
// 根据pool从缓存中获取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool, jedis);
}
return jedis;
}
private static Field getField(Class<?> cls, String fieldName) {
try {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (NoSuchFieldException | SecurityException e) {
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
java客户端不是很好支持redis cluster,spring-date-redis和jedis批量提交还不支持,单个提交都是可以的。 为了批量解决批量提交 网上有几个方案,本示例使用了其中一种,demo里的JedisClusterPipeline类是网上找的,忘记了作者叫什么(里面有注释)。 使用的spring版本是4.3.2以下版本可能不是很好支持,jedis版本是2.8.2, spring-data-redis版本是1.7.2。
资源推荐
资源详情
资源评论
收起资源包目录
redisCluster-demo.zip (64个子文件)
redisCluster-demo
.project 1KB
src
redis.properties 173B
applicationContext.xml 7KB
com
redis
BaseObjec.java 156B
JedisConnect.java 6KB
RedisBaseDao.java 643B
JedisUtils.java 875B
JedisClusterPipeline.java 10KB
ClusterSerializer.java 365B
WebRoot
WEB-INF
classes
redis.properties 173B
applicationContext.xml 7KB
com
redis
JedisClusterPipeline.class 9KB
JedisConnect$9.class 2KB
RedisBaseDao.class 1KB
JedisConnect.class 4KB
JedisConnect$8.class 2KB
JedisConnect$3.class 2KB
JedisConnect$5.class 2KB
ClusterSerializer.class 705B
JedisConnect$1.class 2KB
JedisUtils.class 1KB
JedisConnect$2.class 2KB
JedisConnect$7.class 2KB
JedisConnect$4.class 2KB
JedisConnect$6.class 2KB
BaseObjec.class 364B
lib
spring-aop-4.3.2.RELEASE.jar 371KB
spring-jdbc-4.3.2.RELEASE.jar 416KB
spring-aspects-4.3.2.RELEASE.jar 57KB
spring-context-4.3.2.RELEASE.jar 1.08MB
jedis-2.8.2.jar 528KB
spring-websocket-4.3.2.RELEASE.jar 439KB
spring-webmvc-4.3.2.RELEASE.jar 893KB
spring-instrument-4.3.2.RELEASE.jar 7KB
spring-messaging-4.3.2.RELEASE.jar 373KB
spring-context-support-4.3.2.RELEASE.jar 183KB
spring-jms-4.3.2.RELEASE.jar 282KB
spring-webmvc-4.3.2.RELEASE-javadoc.jar 1.92MB
spring-oxm-4.3.2.RELEASE.jar 83KB
spring-data-redis-1.7.2.RELEASE.jar 1.07MB
spring-test-4.3.2.RELEASE.jar 578KB
spring-beans-4.3.2.RELEASE.jar 739KB
spring-webmvc-4.3.2.RELEASE-sources.jar 837KB
spring-orm-4.3.2.RELEASE.jar 465KB
spring-core-4.3.2.RELEASE.jar 1.06MB
spring-instrument-tomcat-4.3.2.RELEASE.jar 10KB
spring-tx-4.3.2.RELEASE.jar 261KB
spring-expression-4.3.2.RELEASE.jar 258KB
spring-web-4.3.2.RELEASE.jar 793KB
spring-webmvc-portlet-4.3.2.RELEASE.jar 173KB
struts-2.0.dtd 3KB
web.xml 2KB
index.jsp 695B
META-INF
MANIFEST.MF 39B
.settings
org.eclipse.wst.jsdt.ui.superType.container 49B
org.eclipse.wst.common.project.facet.core.xml 462B
org.eclipse.jdt.core.prefs 364B
org.eclipse.wst.jsdt.ui.superType.name 6B
com.genuitec.eclipse.migration.prefs 79B
org.eclipse.wst.common.project.facet.core.prefs.xml 172B
org.eclipse.core.resources.prefs 57B
org.eclipse.wst.common.component 497B
.jsdtscope 522B
.classpath 803B
共 64 条
- 1
资源评论
- crying_moon2017-09-01非常感谢!
- 潇湘无痕5212017-12-21非常感谢!
cs_chang
- 粉丝: 0
- 资源: 1
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功