没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
备注:
1.如果您此前未接触过 RocketMQ,请先阅读附录部分,以便了解 RocketMQ 的整
体架构和相关术语
2.文中的 MQServer 与 Broker 表示同一概念
分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高
吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:
1. 消息的顺序问题
2. 消息的重复问题
RocketMQ 作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解
决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?
关键特性以及其实现原理
一、顺序消息
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产
生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺
序消费才有意义。但同时订单之间又是可以并行消费的。
假如生产者产生了 2 条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?
你脑中想到的可能是这样:
你可能会采用这种方式保证消息顺序
M1 发送到 S1 后,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,那么需要 M1 到
达消费端后,通知 S2,然后 S2 再将 M2 发送到消费端。
这个模型存在的问题是,如果 M1 和 M2 分别发送到两台 Server 上,就不能保证 M1
先达到,也就不能保证 M1 被先消费,那么就需要在 MQ Server 集群维护消息的
顺序。那么如何解决?一种简单的方式就是将 M1、M2 发送到同一个 Server 上:
保证消息顺序,你改进后的方法
这样可以保证 M1 先于 M2 到达 MQServer(客户端等待 M1 成功后再发送 M2),根
据先达到先被消费的原则,M1 会先于 M2 被消费,这样就保证了消息的顺序。
这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问
题:
网络延迟问题
只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所
示,如果发送 M1 耗时大于发送 M2 的耗时,那么 M2 就先被消费,仍然不能保证
消息的顺序。即使 M1 和 M2 同时到达消费端,由于不清楚消费端 1 和消费端 2 的
负载情况,仍然有可能出现 M2 先于 M1 被消费。如何解决这个问题?将 M1 和 M2
发往同一个消费者即可,且发送 M1 后,需要消费端响应成功后才能发送 M2。
但又会引入另外一个问题,如果发送 M1 后,消费端 1 没有响应,那是继续发送 M2
呢,还是重新发送 M1?一般为了保证消息一定被消费,肯定会选择重发 M1 到另
外一个消费端 2,就如下图所示。
保证消息顺序的正确姿势
这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端 1 没有响
应 Server 时有两种情况,一种是 M1 确实没有到达,另外一种情况是消费端 1 已
经响应,但是 Server 端没有收到。如果是第二种情况,重发 M1,就会造成 M1
被重复消费。也就是我们后面要说的第二个问题,消息重复问题。
回过头来看消息顺序问题,严格的顺序消息非常容易理解,而且处理问题也比较
容易,要实现严格的顺序消息,简单且可行的办法就是:
保证生产者 - MQServer - 消费者是一对一对一的关系
但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多
的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不
得不花费更多的精力来解决阻塞的问题。
但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛
盾,那么阿里是如何解决的?
世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询
有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来
规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这
个角度来看消息的顺序问题,我们可以得出两个结论:
1、不关注乱序的应用实际大量存在
2、队列无序并不意味着消息无序
最后我们从源码角度分析 RocketMQ 怎么实现发送顺序消息。
一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业
务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId 相同
的消息,会发送到同一个队列:
// RocketMQ 默认提供了两种 MessageQueueSelector 实现:随机/Hash
SendResultsendResult = producer.send(msg, new MessageQueueSelector() {
@Override
publicMessageQueue select(List<MessageQueue>mqs, Message msg, Object
arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
returnmqs.get(index);
}
}, orderId);
在获取到路由信息以后,会根据 MessageQueueSelector 实现的算法来选择一个
队列,同一个 OrderId 获取到的队列是同一个队列。
privateSendResult send() {
// 获取 topic 路由信息
TopicPublishInfotopicPublishInfo =
this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null &&topicPublishInfo.ok()) {
MessageQueuemq = null;
// 根据我们的算法,选择一个发送队列
// 这里的 arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
returnthis.sendKernelImpl(msg, mq, communicationMode, sendCallback,
timeout);
}
}
}
二、消息重复
上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么
RocketMQ 是怎样解决消息重复的问题呢?还是“恰好”不解决。
造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避
免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问
题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
剩余25页未读,继续阅读
资源评论
小小哭包
- 粉丝: 1900
- 资源: 3864
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功