rocketMQ集群
一、集群模式
RocketMQ 从 4.5 开始支持故障自动切换能力。当主从集群中的 Master 故障后,可以从多个 Slave 中选举出新的 Master,完成故障转移,降低人工干预成本。
1.1 单节点
优点:适合本地开发和功能测试,部署简单;同步刷盘场景下消息可靠性较高。
缺点:单点故障明显,一旦宕机服务就不可用。
1.2 主从模式(异步复制、同步双写)
优点:同步双写模式下消息可靠性较高;主节点故障后,从节点仍可对外提供部分消费能力。
缺点:主从之间存在短暂同步延迟;主节点故障时通常无法继续写入。
1.3 双主模式
优点:配置相对简单,可通过磁盘冗余进一步提升可靠性。
缺点:某个 Master 宕机期间,该节点上未被消费的消息在恢复前不可消费,实时性会受影响。
1.4 双主双从/多主多从(异步复制)
优点:消息可靠性较高,主节点故障后消费者仍可从 Slave 消费,实时性影响较小。
缺点:仍然存在主从延迟;极端情况下可能丢失少量消息。
1.5 双主双从/多主多从(同步双写)
优点:只有主从同时写入成功后才向应用返回成功,服务可用性和数据可用性都更高。
缺点:性能相比异步复制略低。
二、消息可靠性
推荐组合是:同步双写(SYNC_MASTER)+ 异步刷盘(ASYNC_FLUSH)。
这种方式的含义是:主从复制要求同步成功,但消息从内存刷到磁盘采用异步方式,在可靠性和性能之间取得平衡。
三、集群安装
集群部署时,需要修改多台服务器 conf 目录下的配置文件。
2m-2s-async:双主双从,异步复制2m-2s-sync:双主双从,同步双写2m-noslave:双主模式
如果采用一主一从模式,只需要准备主节点和从节点两个配置文件;如果采用双主双从,则 A、B 两台机器都需要分别启动对应的 Master 和 Slave 配置。
3.1 broker-a.properties
主节点 Master 配置:
#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE 无影响
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.56.101:9876;192.168.56.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.56.102
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/abort
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000003.2 broker-a-s.properties
主节点对应的 Slave 配置:
#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10921
#nameserver地址,分号分割
namesrvAddr=192.168.101:9876;192.168.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.101
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/abort
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000003.3 broker-b.properties
另一台机器上的 Master 配置:
#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE 无影响
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.56.101:9876;192.168.56.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.56.101
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/abort
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000003.4 broker-b-s.properties
另一台机器上的 Slave 配置:
#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10921
#nameserver地址,分号分割
namesrvAddr=192.168.101:9876;192.168.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.102
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/abort
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000master节点服务器的启动脚本
#!/bin/sh
#
# rocketmq - this script starts and stops the rocketmq daemon
#
# chkconfig: - 85 15
ROCKETMQ_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release
ROCKETMQ_BIN=${ROCKETMQ_HOME}/bin
ADDR="192.168.56.101:9876;192.168.56.102:9876"
LOG_DIR=${ROCKETMQ_HOME}/logs
NAMESERVER_LOG=${LOG_DIR}/namesrv.log
BROKER_LOG=${LOG_DIR}/broker.log
ROCKETMQ_CONSOL=/usr/local/rocketMQ/view
LOCAL_ADDR=$(hostname -i)
CLUSTER_CONFIG_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
BROKER_A=${CLUSTER_CONFIG_HOME}/broker-a.properties
BROKER_A_S=${CLUSTER_CONFIG_HOME}/broker-a-s.properties
BROKER_B=${CLUSTER_CONFIG_HOME}/broker-b.properties
BROKER_B_S=${CLUSTER_CONFIG_HOME}/broker-b-s.properties
BROKER_A_LOG=${LOG_DIR}/broker-a.log
BROKER_A_S_LOG=${LOG_DIR}/broker-a-s.log
BROKER_B_LOG=${LOG_DIR}/broker-b.log
BROKER_B_S_LOG=${LOG_DIR}/broker-b-s.log
usage() {
echo "********************************************************"
echo "usage: sh server.sh [start|stop|restart]"
echo " *1, start * "
echo " *2, stop * "
echo " *3, restart * "
echo "********************************************************"
read -p "please input option:" option
case "$option" in
"1")
start
;;
"2")
stop
;;
"3")
restart
;;
*)
usage
;;
esac
}
if_exist() {
pid=$(ps -ef | grep rocketmq-console-ng-2.0.0.jar | grep -v grep | awk '{print $2}')
if [[ -z ${pid} ]]; then
return 1
else
return 0
fi
}
start() {
if [ ! -d ${LOG_DIR} ]; then
mkdir ${LOG_DIR}
fi
cd ${ROCKETMQ_BIN}
nohup sh mqnamesrv >${NAMESERVER_LOG} 2>&1 &
echo "The Name Server boot success..."
sleep 3
nohup sh mqbroker -c ${BROKER_B} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_B_LOG} 2>&1 &
echo "The broker[${ADDR}] master boot success..."
sleep 3
nohup sh mqbroker -c ${BROKER_A_S} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_A_S_LOG} 2>&1 &
echo "The broker[${ADDR}] slave boot success..."
cd ${ROCKETMQ_CONSOL}
nohup java -jar rocketmq-console-ng-2.0.0.jar >/dev/null &
sleep 3
echo "The rocketmq-console-ng-2.0.0.jar in running "
}
stop() {
cd ${ROCKETMQ_BIN}
sh mqshutdown broker
sleep 2
sh mqshutdown namesrv
sleep 2
if_exist
if [[ $? -eq "0" ]]; then
for ((i = 0; i < 10; i++)); do
if_exist
if [[ $? -eq "0" ]]; then
kill -9 $pid
echo "rocketmq-console-ng-2.0.0.jar is stop"
else
break
fi
done
else
echo "rocketmq-console-ng-2.0.0.jar is not running"
fi
}
restart() {
stop
sleep 2
start
}
case "$1" in
"start")
start
;;
"stop")
stop
;;
"restart")
restart
;;
*)
usage
;;
esacslave节点服务器的启动脚本
#!/bin/sh
#
# rocketmq - this script starts and stops the rocketmq daemon
#
# chkconfig: - 85 15
ROCKETMQ_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release
ROCKETMQ_BIN=${ROCKETMQ_HOME}/bin
ADDR="192.168.56.101:9876;192.168.56.102:9876"
LOG_DIR=${ROCKETMQ_HOME}/logs
NAMESERVER_LOG=${LOG_DIR}/namesrv.log
ROCKETMQ_CONSOL=/usr/local/rocketMQ/view
LOCAL_ADDR=$(hostname -i)
CLUSTER_CONFIG_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
BROKER_A=${CLUSTER_CONFIG_HOME}/broker-a.properties
BROKER_A_S=${CLUSTER_CONFIG_HOME}/broker-a-s.properties
BROKER_B=${CLUSTER_CONFIG_HOME}/broker-b.properties
BROKER_B_S=${CLUSTER_CONFIG_HOME}/broker-b-s.properties
BROKER_A_LOG=${LOG_DIR}/broker-a.log
BROKER_A_S_LOG=${LOG_DIR}/broker-a-s.log
BROKER_B_LOG=${LOG_DIR}/broker-b.log
BROKER_B_S_LOG=${LOG_DIR}/broker-b-s.log
usage() {
echo "********************************************************"
echo "usage: sh server.sh [start|stop|restart]"
echo " *1, start * "
echo " *2, stop * "
echo " *3, restart * "
echo "********************************************************"
read -p "please input option:" option
case "$option" in
"1")
start
;;
"2")
stop
;;
"3")
restart
;;
*)
usage
;;
esac
}
if_exist() {
pid=$(ps -ef | grep rocketmq-console-ng-2.0.0.jar | grep -v grep | awk '{print $2}')
if [[ -z ${pid} ]]; then
return 1
else
return 0
fi
}
start() {
if [ ! -d ${LOG_DIR} ]; then
mkdir ${LOG_DIR}
fi
cd ${ROCKETMQ_BIN}
nohup sh mqnamesrv >${NAMESERVER_LOG} 2>&1 &
echo "The Name Server boot success..."
sleep 3
nohup sh mqbroker -c ${BROKER_A} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_A_LOG} 2>&1 &
echo "The broker[${ADDR}] master boot success..."
sleep 3
nohup sh mqbroker -c ${BROKER_B_S} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_B_S_LOG} 2>&1 &
echo "The broker[${ADDR}] slave boot success..."
cd ${ROCKETMQ_CONSOL}
nohup java -jar rocketmq-console-ng-2.0.0.jar >/dev/null &
sleep 3
echo "The rocketmq-console-ng-2.0.0.jar in running "
}
stop() {
cd ${ROCKETMQ_BIN}
sh mqshutdown broker
sleep 2
sh mqshutdown namesrv
sleep 2
if_exist
if [[ $? -eq "0" ]]; then
for ((i = 0; i < 10; i++)); do
if_exist
if [[ $? -eq "0" ]]; then
kill -9 $pid
echo "rocketmq-console-ng-2.0.0.jar is stop"
else
break
fi
done
else
echo "rocketmq-console-ng-2.0.0.jar is not running"
fi
}
restart() {
stop
sleep 2
start
}
case "$1" in
"start")
start
;;
"stop")
stop
;;
"restart")
restart
;;
*)
usage
;;
esac进入bin目录下执行:sh mqadmin clusterList -n 192.168.56.101:9876查看当前的集群是否启动成功。
四、集群选举
rocketmq DLedger集群自动选举leader,主从节点切换。
同一组rocketmq Dledger至少需要3个节点,通过raft选举产生leader,其余节点作为follower,实现整个集群的高可用;
如果只有2个节点,当leader节点故障后,由于得不到过半选票,follower节点不会切换为leader节点;
同一集群可部署多个rocketmq dledger group(dLegerGroup),同时对外提供服务
在各自配置文件增加如下配置:
#开启DLedger,默认为false
enableDLegerCommitLog=true
#群组名称,建议和brokerName一致
dLegerGroup=broker-a
#各节点端口信息,如:n0-127.0.0.1:10911;n1-127.0.0.1:10911;n2-127.0.0.1:10911
dLegerPeers:n0-192.168.56.101:10911;n1-192.168.56.102:10911
#节点id,如:n0
dLegerSelfId:n0
#发送线程个数,建议设置成cpu个数
sendMessageThreadPoolNums:2