没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
MQ核心重点问题
如何保证消息队列的高可用
如何保证消息的可靠性传输
如何保证消息的顺序性
如何保证超高并发和超高性能
MQ技术选型
RoketMQ领域模型
RoKetMQ整体架构设计
消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,
Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每
个
Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message
Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。
ConsumerGroup 由多个Consumer 实例构成。
消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发
送到
broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步
和异
步方式均需要Broker返回确认信息,单向发送不需要。
消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将
其提
供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行
消息
订阅的基本单位。
Topic表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息等。
一条消息必须有一个Topic。
最细粒度的订阅单位,一个Group可以订阅多个Topic的消息。
Message Queue
一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩
展能力。
在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的
每个存
储单元都是不定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,
理论
上在 100年内不会溢出,所以认为是长度无限。
也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。
代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。
代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取
请求
作准备。
代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列
表。
多个Namesrv实例组成集群,但相互独立,没有信息交换。
拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、
主动
权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实
时性
较高。
生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他
生产
者实例以提交或回溯消费
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组
的消
费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广
播消
费(Broadcasting)
集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息
普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消
息则
可能是无顺序的
严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的
消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过
Message ID和Key查询消息的功能
标签(Tag)
Tag表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息等
RocketMQ提供2级消息分类,方便灵活控制
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不
同业
务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化
RocketMQ提
供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性
RoketMQ环境部署构建
软件准备
版本:RocketMQ4.9.3
下载地址: https://rocketmq.apache.org/download/
github: https://github.com/apache/rocketmq
环境安装
JDK1.8以上
Linux 64位系统
RoketMQ4.9.3安装
修改配置文件(根据机器内存调整)
bin/runserver.sh
bin/runbroker.sh
bin/tools.sh
nameserver:
# limitations under the License.
#=======================================================================
====================
# Java Environment Setting
#=======================================================================
====================
error_exit ()
{
echo "ERROR: $1 !!"
exit 1
}
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME
variable in your environment, We need java(x64)!"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}
#=======================================================================
====================
# JVM Configuration
#=======================================================================
====================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
DIR_SIZE_IN_MB=600
choose_gc_log_directory()
{
case "`uname`" in
Darwin)
if [ ! -d "/Volumes/RAMDisk" ]; then
# create ram disk on Darwin systems as gc-log directory
DEV=`hdiutil attach -nomount ram://$((2 * 1024 *
DIR_SIZE_IN_MB))` > /dev/null
diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null
echo "Create RAMDisk /Volumes/RAMDisk for gc logging on
Darwin OS."
fi
GC_LOG_DIR="/Volumes/RAMDisk"
;;
*)
runbroker
# check if /dev/shm exists on other systems
if [ -d "/dev/shm" ]; then
GC_LOG_DIR="/dev/shm"
else
GC_LOG_DIR=${BASE_DIR}
fi
;;
esac
}
# 主要修改JAVA_OPT 内存大小
choose_gc_options()
{
# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version
"([0-9]*).*$/\1/p')
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ]
; then
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -
XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70
-XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -
XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -
Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -
XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -
XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -XX:MetaspaceSize=128m
-XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -
XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -
XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -
Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,f
ilesize=30M"
fi
}
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}
#=======================================================================
====================
# JVM Configuration
#=======================================================================
====================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
剩余39页未读,继续阅读
资源评论
定格我的天空
- 粉丝: 10
- 资源: 9
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功