Skip to content

rocketMQ集群

一、集群模式

RocketMQ 从 4.5 开始支持故障自动切换能力。当主从集群中的 Master 故障后,可以从多个 Slave 中选举出新的 Master,完成故障转移,降低人工干预成本。

1.1 单节点

优点:适合本地开发和功能测试,部署简单;同步刷盘场景下消息可靠性较高。

缺点:单点故障明显,一旦宕机服务就不可用。

1.2 主从模式(异步复制、同步双写)

优点:同步双写模式下消息可靠性较高;主节点故障后,从节点仍可对外提供部分消费能力。

缺点:主从之间存在短暂同步延迟;主节点故障时通常无法继续写入。

1.3 双主模式

优点:配置相对简单,可通过磁盘冗余进一步提升可靠性。

缺点:某个 Master 宕机期间,该节点上未被消费的消息在恢复前不可消费,实时性会受影响。

1.4 双主双从/多主多从(异步复制)

优点:消息可靠性较高,主节点故障后消费者仍可从 Slave 消费,实时性影响较小。

缺点:仍然存在主从延迟;极端情况下可能丢失少量消息。

1.5 双主双从/多主多从(同步双写)

优点:只有主从同时写入成功后才向应用返回成功,服务可用性和数据可用性都更高。

缺点:性能相比异步复制略低。

二、消息可靠性

推荐组合是:同步双写(SYNC_MASTER)+ 异步刷盘(ASYNC_FLUSH)。

这种方式的含义是:主从复制要求同步成功,但消息从内存刷到磁盘采用异步方式,在可靠性和性能之间取得平衡。

三、集群安装

集群部署时,需要修改多台服务器 conf 目录下的配置文件。

  • 2m-2s-async:双主双从,异步复制
  • 2m-2s-sync:双主双从,同步双写
  • 2m-noslave:双主模式

如果采用一主一从模式,只需要准备主节点和从节点两个配置文件;如果采用双主双从,则 A、B 两台机器都需要分别启动对应的 Master 和 Slave 配置。

3.1 broker-a.properties

主节点 Master 配置:

#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE 无影响
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.56.101:9876;192.168.56.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.56.102
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/abort

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

3.2 broker-a-s.properties

主节点对应的 Slave 配置:

#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10921
#nameserver地址,分号分割
namesrvAddr=192.168.101:9876;192.168.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.101
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/abort

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

3.3 broker-b.properties

另一台机器上的 Master 配置:


#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE 无影响
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.56.101:9876;192.168.56.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.56.101
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store/abort

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

3.4 broker-b-s.properties

另一台机器上的 Slave 配置:

#集群名称
brokerClusterName=Cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,例如:A服务器master叫broker-b,他的slave也叫broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10921
#nameserver地址,分号分割
namesrvAddr=192.168.101:9876;192.168.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#本机IP
brokerIP1=192.168.102
storePathRootDir=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s
storePathCommitLog=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/store-s/abort

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

master节点服务器的启动脚本

#!/bin/sh
#
# rocketmq - this script starts and stops the rocketmq daemon
#
# chkconfig: - 85 15

ROCKETMQ_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release
ROCKETMQ_BIN=${ROCKETMQ_HOME}/bin
ADDR="192.168.56.101:9876;192.168.56.102:9876"
LOG_DIR=${ROCKETMQ_HOME}/logs
NAMESERVER_LOG=${LOG_DIR}/namesrv.log
BROKER_LOG=${LOG_DIR}/broker.log
ROCKETMQ_CONSOL=/usr/local/rocketMQ/view

LOCAL_ADDR=$(hostname -i)
CLUSTER_CONFIG_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
BROKER_A=${CLUSTER_CONFIG_HOME}/broker-a.properties
BROKER_A_S=${CLUSTER_CONFIG_HOME}/broker-a-s.properties
BROKER_B=${CLUSTER_CONFIG_HOME}/broker-b.properties
BROKER_B_S=${CLUSTER_CONFIG_HOME}/broker-b-s.properties
BROKER_A_LOG=${LOG_DIR}/broker-a.log
BROKER_A_S_LOG=${LOG_DIR}/broker-a-s.log
BROKER_B_LOG=${LOG_DIR}/broker-b.log
BROKER_B_S_LOG=${LOG_DIR}/broker-b-s.log

usage() {
    echo "********************************************************"
    echo "usage: sh server.sh [start|stop|restart]"
    echo " *1, start * "
    echo " *2, stop * "
    echo " *3, restart * "
    echo "********************************************************"
    read -p "please input option:" option
    case "$option" in
    "1")
        start
        ;;
    "2")
        stop
        ;;
    "3")
        restart
        ;;
    *)
        usage
        ;;
    esac

}

if_exist() {
    pid=$(ps -ef | grep rocketmq-console-ng-2.0.0.jar | grep -v grep | awk '{print $2}')
    if [[ -z ${pid} ]]; then
        return 1
    else
        return 0
    fi
}

start() {
    if [ ! -d ${LOG_DIR} ]; then
        mkdir ${LOG_DIR}
    fi
    cd ${ROCKETMQ_BIN}
    nohup sh mqnamesrv >${NAMESERVER_LOG} 2>&1 &
    echo "The Name Server boot success..."
    sleep 3
    nohup sh mqbroker -c ${BROKER_B} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_B_LOG} 2>&1 &
    echo "The broker[${ADDR}] master boot success..."
    sleep 3
    nohup sh mqbroker -c ${BROKER_A_S} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_A_S_LOG} 2>&1 &
    echo "The broker[${ADDR}] slave boot success..."
    cd ${ROCKETMQ_CONSOL}
    nohup java -jar rocketmq-console-ng-2.0.0.jar >/dev/null &
    sleep 3
    echo "The rocketmq-console-ng-2.0.0.jar in running "
}

stop() {
    cd ${ROCKETMQ_BIN}
    sh mqshutdown broker
    sleep 2
    sh mqshutdown namesrv
    sleep 2
    if_exist
    if [[ $? -eq "0" ]]; then
        for ((i = 0; i < 10; i++)); do
            if_exist
            if [[ $? -eq "0" ]]; then
                kill -9 $pid
                echo "rocketmq-console-ng-2.0.0.jar is stop"
            else
                break
            fi
        done
    else
        echo "rocketmq-console-ng-2.0.0.jar is not running"
    fi
}

restart() {
    stop
    sleep 2
    start
}

case "$1" in
"start")
    start
    ;;
"stop")
    stop
    ;;
"restart")
    restart
    ;;
*)
    usage
    ;;
esac

slave节点服务器的启动脚本

#!/bin/sh
#
# rocketmq - this script starts and stops the rocketmq daemon
#
# chkconfig: - 85 15

ROCKETMQ_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release
ROCKETMQ_BIN=${ROCKETMQ_HOME}/bin
ADDR="192.168.56.101:9876;192.168.56.102:9876"
LOG_DIR=${ROCKETMQ_HOME}/logs
NAMESERVER_LOG=${LOG_DIR}/namesrv.log
ROCKETMQ_CONSOL=/usr/local/rocketMQ/view

LOCAL_ADDR=$(hostname -i)
CLUSTER_CONFIG_HOME=/usr/local/rocketMQ/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
BROKER_A=${CLUSTER_CONFIG_HOME}/broker-a.properties
BROKER_A_S=${CLUSTER_CONFIG_HOME}/broker-a-s.properties
BROKER_B=${CLUSTER_CONFIG_HOME}/broker-b.properties
BROKER_B_S=${CLUSTER_CONFIG_HOME}/broker-b-s.properties
BROKER_A_LOG=${LOG_DIR}/broker-a.log
BROKER_A_S_LOG=${LOG_DIR}/broker-a-s.log
BROKER_B_LOG=${LOG_DIR}/broker-b.log
BROKER_B_S_LOG=${LOG_DIR}/broker-b-s.log

usage() {
    echo "********************************************************"
    echo "usage: sh server.sh [start|stop|restart]"
    echo " *1, start * "
    echo " *2, stop * "
    echo " *3, restart * "
    echo "********************************************************"
    read -p "please input option:" option
    case "$option" in
    "1")
        start
        ;;
    "2")
        stop
        ;;
    "3")
        restart
        ;;
    *)
        usage
        ;;
    esac

}

if_exist() {
    pid=$(ps -ef | grep rocketmq-console-ng-2.0.0.jar | grep -v grep | awk '{print $2}')
    if [[ -z ${pid} ]]; then
        return 1
    else
        return 0
    fi
}

start() {
    if [ ! -d ${LOG_DIR} ]; then
        mkdir ${LOG_DIR}
    fi
    cd ${ROCKETMQ_BIN}
    nohup sh mqnamesrv >${NAMESERVER_LOG} 2>&1 &
    echo "The Name Server boot success..."
    sleep 3
    nohup sh mqbroker -c ${BROKER_A} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_A_LOG} 2>&1 &
    echo "The broker[${ADDR}] master boot success..."
    sleep 3
    nohup sh mqbroker -c ${BROKER_B_S} -n ${ADDR} autoCreateTopicEnable=true >${BROKER_B_S_LOG} 2>&1 &
    echo "The broker[${ADDR}] slave boot success..."
    cd ${ROCKETMQ_CONSOL}
    nohup java -jar rocketmq-console-ng-2.0.0.jar >/dev/null &
    sleep 3
    echo "The rocketmq-console-ng-2.0.0.jar in running "
}

stop() {
    cd ${ROCKETMQ_BIN}
    sh mqshutdown broker
    sleep 2
    sh mqshutdown namesrv
    sleep 2
    if_exist
    if [[ $? -eq "0" ]]; then
        for ((i = 0; i < 10; i++)); do
            if_exist
            if [[ $? -eq "0" ]]; then
                kill -9 $pid
                echo "rocketmq-console-ng-2.0.0.jar is stop"
            else
                break
            fi
        done
    else
        echo "rocketmq-console-ng-2.0.0.jar is not running"
    fi
}

restart() {
    stop
    sleep 2
    start
}

case "$1" in
"start")
    start
    ;;
"stop")
    stop
    ;;
"restart")
    restart
    ;;
*)
    usage
    ;;
esac

进入bin目录下执行:sh mqadmin clusterList -n 192.168.56.101:9876查看当前的集群是否启动成功。

四、集群选举

rocketmq DLedger集群自动选举leader,主从节点切换。

同一组rocketmq Dledger至少需要3个节点,通过raft选举产生leader,其余节点作为follower,实现整个集群的高可用;

如果只有2个节点,当leader节点故障后,由于得不到过半选票,follower节点不会切换为leader节点;

同一集群可部署多个rocketmq dledger group(dLegerGroup),同时对外提供服务

在各自配置文件增加如下配置:

#开启DLedger,默认为false
enableDLegerCommitLog=true
#群组名称,建议和brokerName一致
dLegerGroup=broker-a
#各节点端口信息,如:n0-127.0.0.1:10911;n1-127.0.0.1:10911;n2-127.0.0.1:10911
dLegerPeers:n0-192.168.56.101:10911;n1-192.168.56.102:10911
#节点id,如:n0
dLegerSelfId:n0
#发送线程个数,建议设置成cpu个数
sendMessageThreadPoolNums:2