没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
RocketMQ 技术讲解
Csdn 博客: http://blog.csdn.net/meilong_whpu
目录
ROCKETMQ 技术讲解 ......................................................................................................................... 1
1 存储篇.............................................................................................................................................. 6
1.1 整体结构 ................................................... 6
1.2 大文件的磁盘操作——MapedFile .............................. 6
1.2.1 向文件顺序写操作(appendMessage)..................... 7
1.2.2 消息刷盘操作(commit)................................ 7
1.2.3 随机读操作(selectMapedBuffer)....................... 8
1.2.4 清理内存操作.......................................... 8
1.2.5 判断文件是否写满...................................... 8
1.3 MapedFileQueue——为操作 commitlog/consumequeue 文件提供访问
服务 8
1.3.1 获取在某时间点之后更新的文件(getMapedFileByTime).... 9
1.3.2 清理指定偏移量所在文件之后的文件(truncateDirtyFiles) 9
1.3.3 获取或创建最后一个文件(getLastMapedFile)............ 9
1.3.4 获取列表中的最后一个文件(getLastMapedFile2)........ 10
1.3.5 统计内存的数据还有多少未刷到磁盘文件中
(howMuchFallBehind)...................................... 10
1.3.6 获取 MapedFile 队列中最小 Offset 值(getMinOffset).... 10
1.3.7 获取 MapedFile 队列中最大 Offset 值(getMaxOffset).... 10
1.3.8 删除某类文件中的最后一个文件(deleteLastMaped)...... 10
1.3.9 根据指定的 offset 找到所在文件(findMapedFileByOffset)
11
1.3.10MapedFile 队列中的消息刷盘(commit) ................. 11
1.4 Commitlog ................................................. 11
1.4.1 文件的消息单元存储结构............................... 11
1.4.2 CommitLog 类结构 ..................................... 13
1.4.3 获取最小 Offset(getMinOffset) ...................... 13
1.4.4 获取最大物理偏移量(getMaxOffset)................... 13
1.4.5 读取指定起始位置 offset 所在文件的全部剩余消息........ 13
1.4.6 正常恢复 CommitLog 内存数据(recoverNormally)........ 14
1.4.7 异常恢复 CommitLog 内存数据(recoverAbnormally)...... 14
1.4.8 写入消息(putMessage)............................... 15
1.4.9 读取消息(getMessage)............................... 17
1.4.10 指定位置开始写入二进制消息(appendData) ............ 17
1.4.11 获取指定位置所在文件的下一个文件的起始偏移量
(rollNextFile)........................................... 18
1.4.12DefaultAppendMessageCallback 类的实现 ................ 18
1.5 Consumequeue .............................................. 19
1.5.1 ConsumeQueue 类结构 .................................. 20
1.5.2 删除指定偏移量之后的逻辑文件(truncateDirtyLogicFiles)
20
1.5.3 恢复 ConsumeQueue 内存数据(recover)................. 21
1.5.4 查找消息发送时间最接近 timestamp 逻辑队列的 offset
(getOffsetInQueueByTime)................................. 21
1.5.5 获取最后一条消息对应物理队列的下一个偏移量
(getLastOffset).......................................... 22
1.5.6 消息刷盘(commit)................................... 22
1.5.7 将 commitlog 物理偏移量、消息大小等信息直接写入
consumequeue 中(putMessagePostionInfoWrapper) ............ 22
1.5.8 根据消息序号索引获取 consumequeue 数据(getIndexBuffer)
22
1.5.9 根据物理队列最小 offset 计算修正逻辑队列最小 offset
(correctMinOffset)....................................... 22
1.5.10 获取指定位置所在文件的下一个文件的起始偏移量
(rollNextFile)........................................... 23
1.6 IndexFile——为操作 Index 文件提供访问服务 ................. 23
1.6.1 Index 文件的数据结构 ................................. 24
1.6.2 向 index 文件中写入索引消息(putKey)................. 25
1.6.3 以 topic-key 值从 Index 中获取在一个时间区间内的物理偏移量
列表(selectPhyOffset).................................... 25
1.7 IndexService .............................................. 26
1.7.1 创建消息的索引(buildIndex)......................... 26
1.7.2 查找 topic 和 key 的物理偏移量 offset(queryOffest) ... 26
1.8 Config .................................................... 27
1.8.1 ScheduleMessageService 执行延迟消息 .................. 27
1.8.2 收到发送消息时创建 topic 的配置信息
(createTopicInSendMessageMethod)......................... 29
1.8.3 根据 GroupName 查找订阅组信息
(findSubscriptionGroupConfig)............................ 29
1.8.4 收到消费失败时的回传消息时创建 topic 的配置信息
(createTopicInSendMessageBackMethod)..................... 29
1.9 DefaultMessageStore—所有文件的访问入口 ................... 30
1.9.1 根据 topic 和 queueId 查找 ConsumeQueue ................ 30
1.9.2 根据物理偏移量和数据大小获取消息内容
(lookMessageByOffset).................................... 30
1.9.3 将消息写入 commitlog 中(putMessage)................. 30
1.9.4 读取 commitlog 消息(getMessage)..................... 31
1.9.5 获取最大物理偏移量(getMaxPhyOffset)................ 33
1.9.6 获取指定偏移量之后的所有 commitlog 数据............... 33
1.9.7 从指定位置开始追加 commitlog 数据(appendToCommitLog) 33
1.9.8 DefaultMessageStore.ReputMessageService 服务线程 ..... 33
1.9.9 DefaultMessageStore.DispatchMessageService 服务线程 .. 34
1.9.10 加载 ConsumeQueue 队列数据(loadConsumeQueue) ....... 35
1.9.11 获取指定队列的最大逻辑 Offset(getMaxOffsetInQuque). 35
1.9.12 获取指定队列的最小逻辑 Offset(getMinOffsetInQuque). 35
1.10 Abort ..................................................... 36
1.11 Checkpoint ................................................ 36
1.12 HA 高可用 ................................................. 36
1.12.1Topic 配置同步 ....................................... 36
1.12.2 消费进度信息同步 .................................... 37
1.12.3 延迟消费进度信息同步 ................................ 37
1.12.4 订阅关系同步 ........................................ 37
1.12.5 消息数据同步 ........................................ 37
1.13 事务消息相关的文件 ...................................... 41
1.13.1 事务消息状态文件 .................................... 42
1.13.2 事务消息 REDO 日志文件 ............................... 42
1.13.3 定期向 Producer 回查事务消息的最新状态 ............... 42
2 NAME SERVER 篇 ..................................................................................................................... 43
2.1 NameServer 的功能 ......................................... 43
2.2 NameServer 的初始化及启动过程 ............................. 43
2.3 处理 Broker 注册请求 ....................................... 45
2.4 根据 Topic 获取 Broker 信息和 topic 配置信息 ................. 46
3 BROKER 篇 ................................................................................................................................. 47
3.1 Broker 的初始化过程 ....................................... 47
3.2 Broker 的启动过程 ......................................... 50
3.3 向 NameServer 注册 Broker .................................. 52
3.4 清理未使用的 topic 数据 .................................... 53
3.5 根据 topic 和 group 查找当前的订阅信息 ...................... 53
3.6 处理 Producer 发送的消息 ................................... 53
3.7 处理心跳消息 .............................................. 54
3.8 注册 Consumer 信息 ......................................... 55
3.9 注册 Producer 信息 ......................................... 56
3.10 查询消费进度(QUERY_CONSUMER_OFFSET) ................... 56
3.11 更新消费进度(UPDATE_CONSUMER_OFFSET) .................. 57
3.12 处理 Consumer 拉取消息 ................................... 57
3.13 PullRequestHoldService 线程服务业务处理逻辑 ............... 60
3.14 处理锁住 MessageQueue 队列的请求(LOCK_BATCH_MQ) ........ 61
3.15 处理解锁 MessageQueue 队列的请求(UNLOCK_BATCH_MQ) ...... 62
3.16 获取 consumerGroup 名下的所有 Consumer 的 ClientId
(GET_CONSUMER_LIST_BY_GROUP)................................. 62
3.17 写入 Consumer 消费失败的消息(CONSUMER_SEND_MSG_BACK) ... 63
3.18 处理结束事务消息的请求(END_TRANSACTION) ............... 64
3.19 客户端发起更新或创建 Topic(UPDATE_AND_CREATE_TOPIC).... 64
4 PRODUCER 篇 ............................................................................................................................ 65
4.1 启动 Producer ............................................. 65
4.2 启动 MQClientInstance 类(MQClientInstance.start) ......... 66
4.3 更新 Topic 的路由信息(updateTopicRouteInfoFromNameServer) 68
4.4 向 Broker 发送心跳消息 ..................................... 71
4.5 发送普通消息(send) ...................................... 71
4.6 发送定时消息 .............................................. 73
4.7 发送顺序消息 .............................................. 74
4.8 发送事务消息 .............................................. 75
4.9 处理 Broker 检查事务状态的消息(CHECK_TRANSACTION_STATE) .. 76
4.10 创建 Topic............................................... 77
5 CONSUMER 篇 .............................................................................................................................. 78
5.1 启动 Consumer ............................................. 78
5.2 向 Broker 同步消费进度 ..................................... 83
5.3 RebalanceService 服务线程 ................................. 83
5.3.1 广播模式下的 RebalanceImpl.rebalanceByTopic 方法...... 84
5.3.2 集群模式下的 RebalanceImpl.rebalanceByTopic 方法...... 85
5.3.3 updateProcessQueueTableInRebalance 方法逻辑 .......... 86
5.3.4 removeUnnecessaryMessageQueue 方法逻辑 ............... 87
5.3.5 获取 MessageQueue 队列的下一个消费偏移量.............. 88
5.4 PullMessageService 服务线程 ............................... 90
5.4.1 不间断地处理 pullRequestQueue 队列中的 PullRequest 对象 90
5.4.2 让 PullRequest 对象延迟加入 pullRequestQueue 队列...... 90
5.5 PUSH 模式下的消息消费(DefaultMQPushConsumer) ............ 90
5.5.1 拉取消息(pullMessage).............................. 90
5.5.2 内部匿名类 PullCallback .............................. 92
5.6 PULL 模式下的消息消费(DefaultMQPullConsumer) ............ 95
5.6.1 应用层的使用方式..................................... 96
5.6.2 获取队列的消费进度(fetchConsumeOffset)............. 97
5.7 顺序消费的逻辑(ConsumeMessageOrderlyService) ............ 97
5.7.1 回调业务层定义的消费方法............................. 98
5.7.2 处理回调方法 consumeMessage 的执行结果............... 100
5.7.3 重新获取锁再消费(tryLockLaterAndReconsume)........ 102
5.8 并发消费的逻辑(ConsumeMessageConcurrentlyService) ...... 102
5.8.1 回调业务层定义的消费方法............................ 102
5.8.2 消息重试——消费失败的信息回传给 Broker ............. 104
5.9 底层的拉取消息的 API 接口(PullAPIWrapper.pullKernelImpl) 105
5.10 发送远程请求 PULL_MESSAGE 的逻辑 ........................ 106
5.10.1 同步发送方式 ....................................... 107
5.10.2 异步发送方式 ....................................... 108
5.10.3 收到 Broker 的响应消息的处理逻辑 .................... 109
5.11 PULL 消费模式下的调度消费服务 ............................ 110
5.12 四种 MessageQueue 队列的分配策略 ........................ 111
6 技术解决方案 .............................................................................................................................. 112
6.1 获取系统时间的性能优化 ................................... 112
剩余111页未读,继续阅读
资源评论
- 腊八粥20182021-06-1017年的文档,只要不介意这个,就行
- 等等留一手2017-09-28希望能 有用
meilong_whpu
- 粉丝: 94
- 资源: 8
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 已过基于Hadoop+Spark招聘推荐可视化系统 大数据项目 毕业设计(源码下载)
- python爬虫开发题答案及题目-100(1).zip
- Python 小游戏 (贪吃蛇、五子棋、扫雷、俄罗斯方块)-3 (2).zip
- c语言实现的数独小游戏.zip
- 高德地图中国行政区划省、市、县经纬度
- March 2024 Expiration Of The OAM Out Of The Box Certificates
- 二叉搜索树迭代器(java代码).docx
- 解决keil MDK 5.38版本 在Debug配置使用STlink调试时软件闪退的问题
- py小项目:用户登录和注册系统开发欢迎图片
- TCCEE-x64-v6.2.3(9.51)
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功