![rabbitmq.png](http://upload-images.jianshu.io/upload_images/1397675-8f305b180a895baf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**20180927 更新**
升级 retryCache的容器,修改为 ConcurrentSkipListMap
1 retry的时候按先后顺序尝试
2 hashMap无法自动缩容,在rabbitmq出现问题时,map造成积压,等问题恢复后,map的多余空间无法自动释放,而SkipListMap可以完美避开这个问题
3 在大量插入删除时,SkipList的效率更高
---
**20180710 更新**
1 升级spring-rabbit版本,升级到最新版本
2 去除对QueueConsumer的使用,改为使用basicGet方法(消费效率和原来的方式对比,有微弱提升)
3 改进一些打log的细节
---
**20180517 更新**
1 retryCache重构,解决rabbitmq挂掉时消息积压的问题
2 部分细节改进
---
**20171120 更新**
1 改进一些细节:遍历map时基于entry,增加一定的效率
---
**20170510 更新**
1 增加线程池consumer优雅退出机制Runtime.getRuntime().addShutdownHook
2 修改部分log输出方式,将原来的 log.info("exceptin:" + e) 修复为 log.info("exception: ", e)
---
**20161227 更新**
1 bug fix: 将messageProcess包裹在try,catch中,避免队列中出现unack的死信息
2 bug分析见http://www.jianshu.com/p/a7edc3322b44
---
**20161205 更新**
1 增加topic模式
2 原有的使用direct方式无需更改,本次为兼容性升级,增加了buildTopicMessageSender和buildTopicMessageConsumer方法
3 ThreadPoolConsumer默认为direct方式,可以通过setType("topic")修改为topic模式
---
**20160907 更新**
1 解决因网络抖动而引起的发送数据丢失
2 增加retry模块
3 在本地缓存已发送数据,根据ack的确认将已ack的删除
4 定时触发重发未收到ack的数据
5 保证在网络抖动的情况下数据不丢失,但可能会造成数据的重复发送(建议在consumer端做到message处理的幂等性)
---
最近的一个计费项目,在rpc调用和流式处理之间徘徊了许久,后来选择流式处理。一是可以增加吞吐量,二是事务的控制相比于rpc要容易很多。
确定了流式处理的方式,后续是技术的选型。刚开始倾向于用storm,无奈文档实在太少,折腾起来着实费劲。最终放弃,改用消息队列+微服务的方式实现。
消息队列的选型上,有activemq,rabbitmq,kafka等。最开始倾向于用activemq,因为以前的项目用过,很多代码都是可直接复用的。后来看了不少文章对比,发现rabbitmq对多语言的支持更好一点,同时相比于kafka,牺牲了部分的性能换取了更好的稳定性安全性以及持久化。
最终决定使用rabbitmq。
rabbitmq的官网如下:
>https://www.rabbitmq.com/
对rabbitmq的封装,有几个目标:
1 提供send接口
2 提供consume接口
3 保证消息的事务性处理
所谓事务性处理,是指对一个消息的处理必须严格可控,必须满足原子性,只有两种可能的处理结果:
(1) 处理成功,从队列中删除消息
(2) 处理失败(网络问题,程序问题,服务挂了),将消息重新放回队列
为了做到这点,我们使用rabbitmq的手动ack模式,这个后面细说。
**1 send接口**
```
public interface MessageSender {
DetailRes send(Object message);
}
```
send接口相对简单,我们使用spring的RabbitTemplate来实现,代码如下:
```
//1 构造template, exchange, routingkey等
//2 设置message序列化方法
//3 设置发送确认
//4 构造sender方法
public MessageSender buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException, TimeoutException {
Connection connection = connectionFactory.createConnection();
//1
buildQueue(exchange, routingKey, queue, connection);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
//2
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//3
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
}
}
});
//4
return new MessageSender() {
@Override
public DetailRes send(Object message) {
try {
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException e) {
e.printStackTrace();
log.info("send failed " + e);
try {
//retry
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException error) {
error.printStackTrace();
log.info("send failed again " + error);
return new DetailRes(false, error.toString());
}
}
return new DetailRes(true, "");
}
};
}
```
**2 consume接口**
```
public interface MessageConsumer {
DetailRes consume();
}
```
在consume接口中,会调用用户自己的MessageProcess,接口定义如下:
```
public interface MessageProcess<T> {
DetailRes process(T message);
}
```
consume的实现相对来说复杂一点,代码如下:
```
//1 创建连接和channel
//2 设置message序列化方法
//3 构造consumer
public <T> MessageConsumer buildMessageConsumer(String exchange, String routingKey,
final String queue, final MessageProcess<T> messageProcess) throws IOException {
final Connection connection = connectionFactory.createConnection();
//1
buildQueue(exchange, routingKey, queue, connection);
//2
final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
final MessageConverter messageConverter = new Jackson2JsonMessageConverter();
//3
return new MessageConsumer() {
QueueingConsumer consumer;
{
consumer = buildQueueConsumer(connection, queue);
}
@Override
//1 通过delivery获取原始数据
//2 将原始数据转换为特定类型的包
//3 处理数据
//4 手动发送ack确认
public DetailRes consume() {
QueueingConsumer.Delivery delivery = null;
Channel channel = consumer.getChannel();
try {
//1
delivery = consumer.nextDelivery();
Message message = new Message(delivery.getBody(),
messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8"));
//2
@SuppressWarnings("unchecked")
T messageBean = (T) messageConverter.fromMessage(message);
//3
DetailRes detailRes = messageProcess.process(messageBean);
//4
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
log.info("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
return detailRes;
} catch (InterruptedException e) {
e.printStackTrace();
return new DetailRes(false, "interrupted exception " + e.toString());
} catch (IOException e) {
e.printStackTrace();
retry(delivery, channel);
rabbitmq手动确认模式java封装
需积分: 1 134 浏览量
2024-05-21
16:52:00
上传
评论
收藏 21KB ZIP 举报
stormsha
- 粉丝: 5158
- 资源: 152
最新资源
- 基于GUI+MYSQL+JAVA图书管理系统文档说明+源码(高分大作业项目).zip
- 基于Qt使用C++实现图书管理系统源码+数据库(95分以上).zip
- 基于GUI+MYSQL+JAVA票务管理系统文档介绍+源码+数据库(高分大作业).zip
- 优先编码器除法电微分运算电路 全加器函数发生电路等电路经典Multisim仿真实验源文件合集(25个).zip
- 2331308JS课堂案例.zip
- STM32H750VBT6单片机最小系统开发板AD设计硬件(原理图+PCB+3D封装库)工程文件.zip
- 基于74LS161+ 74LS192芯片实现倒计时定时器Multisim仿真源文件,Multisim10以上版本可打开运行
- 科大讯飞语音引擎 jar包 demo,科大讯飞语音合成引擎3.0,支持4.0系统以上,文字转语音输出.zip
- Java架构面试笔试专题资料及经验(含答案)SpringBoot面试Linux面试专题及答案 合集.zip
- 头歌c语言实验答案tion-model-for-ne开发笔记
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈