Skip to content

rocketMQ

一、rocketMQ特点

Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件 特点:

  1. 支持Broker和Consumer端消息过滤

  2. 适合金融类业务,高可用性跟踪和审计功能。

  3. 支持发布订阅模型,和点对点

  4. 支持拉pull和推push两种消息模式

  5. 单一队列百万消息

  6. 支持单master节点,多master节点,多master多slave节点

  7. 消息失败重发机制、支持特定level的定时消息

  8. 新版本底层用的netty

  9. 4.3.x开始支持分布式事务

官网地址:http://rocketmq.apache.org/

学习资源:

  1. http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/

  2. https://www.jianshu.com/p/453c6e7ff81c

二、rocketMQ概念

name|说明

:---😐:---:

Producer|消息生产者

Producer Group|消息生产者组,发送同类消息的一个消息生产组

Consumer|消费者

Consumer Group|消费同个消息的多个实例

Tag|标签,子主题(二级分类),用于区分同一个主题下的不同业务的消息

Topic|主题,如订阅消息,queue是消息的物理管理单位,而topic是逻辑管理单位,一个topic下可以有多个queue

Message|消息,每个message必须指定一个topic

Broker|MQ程序,接收生产的消息,提供给消费者消费的程序

Name Server|给生产和消费者提供路由信息,提供轻量级的服务发现和路由

offset|偏移量,可以理解为消息进度

commit log|消息存储会写在commit log文件里

三、rocketMQ消息发送状态

broker消息投递状态

  • SEND_OK

消息发送成功

  • FLUSH_DISK_TIMEOUT

没有在规定时间内完成刷盘(刷盘策略需要为SYNC_FLUSH同步刷盘策略才会出现)

  • FLUSH_SLAVE_TIMEOUT

主从模式下,broker是SYNC_MASTER,没有在规定时间内完成主从同步

  • SLAVE_NOT_AVALABLE

从模式下,broker是SYNC_MASTER,但是没有找到被配置成SLAVE的broker

四、延迟消息消费

延迟消息的消费级别,即当生产者发送到rocketMQ服务器上,多长时间以后消费者才能消费。默认是立即消费的情况。

消费的级别在源码的MessageStoreConfig类下的这个属性。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

在发送消息的时候设置message.setDelayTimeLevel(1);1表示1秒以后才能消费,从1开始计算,1表示1秒,2表示5秒。

五、指定队列消费

为了保证队列在高的访问量的时候,不影响其它队列的时候,指定消费的队列。

//指定消息队列消费该消息,后面0的参数表示指定的第一个队列。callback为回调函数
            payProducer.getProducer().send(message,
                    (mqList,msg,arg) -> {
                        int queueNum = Integer.parseInt(arg.toString());
                        return mqList.get(queueNum);
                    },
                    0,
                    new SendCallback() {

                        @Override public void onSuccess(SendResult sendResult){
                            asyncCallBack(sendResult);
                        }

                        @Override public void onException(Throwable throwable){
                            asyncFailed(throwable);
                        }
                    });

六、消费者通过用户属性进行消费(不建议使用)

在发送消息的时候增加自定义的用户属性,因为topic是一级标签,tags是二级标签,用户属性可以理解为过滤条件。发送的时候增加属性。

message.putUserProperty("aaa", String.valueOf(100));

这个时候可能rocketMQ集群不支持这个机制,修改配置文件,在配置文件加上配置enablePropertyFilter=true。

消费者类改为PushConsumer,消费属性aaa在300和381之间:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");

consumer.subscribe("topic1", MessageSelector.bySql("aaa> 300 and aaa between 250 and 381 "));

RocketMQ只实现了一些基础的语法,支持=、>、<、>=、<=、between and、<>、in、is null、is not null、and、or、not。字符串需要使用单引号包起来。

七、偏移量offset

  • message queue是无限长的数组,一条消息进来下标就会涨1,下标就是offset,消息在某个messageQueue里的位置,通过offset的值可以定位到这条消息,或者提示comsumer从这条消息开始向后处理。

  • message queue中的maxOffset表示消息的最大offset,maxOffset不是最新的消息offset,而是最新的offset+1,minoffset则是现存最小的offset。

fileReserverTime=48默认消息存储48小时以后,消费会被物理的从磁盘删除,message queue的minoffset也就相应的增长。所以比minoffset还要小的消息已经不存储在broker上了, 就无法被消费。

类型(父类是offsetStore):

  • 本地文件类型

DefaultMQPushConsumer的BROADCASTING模式,各个consumer互不干扰,使用localFileOffsetStore,把offset存储在本地

  • broker代存储类型

DefaultMQPushConsumer的CLUSTERING(集群)模式,有Broker端存储和控制offset的值,使用RemoteBrokerOffsetStore

推荐使用PushConsumer,RoketMQ自动维护offsetStore。如果使用PullConsumer需要自己进行维护offsetStore

八、可靠性传输

rocketMQ的可靠性传输:

  • producer端

    • 不采用oneway发送,使用同步或异步方式发送,做好重试,但是重试的message key必须唯一

    • 投递的日志需要保存,关键字段,投递时间、投递状态、重试次数、请求体、响应体

  • broker端

    • 双主从架构,nameserver需要多节点

    • 同步双写、异步刷盘(同步刷盘可靠性更高,但是性能略差,根据业务进行选择)

  • consumer端

    • 消费消息务必保留消费日志,即消息的元数据和消息体

    • 消费端务必做好幂等性处理

  • 投递到broker后

    • 机器端点重启:异步刷盘,消息丢失;同步刷盘消息不丢失

    • 硬件故障:可能存在丢失,看队列架构

九、高性能分析

  • MQ架构配置

    • 顺序写,随机读,零拷贝

    • 同步刷盘SYNC_FLUSH和异步刷盘ASYNC_FLUSH,通过flushDiskType配置

    • 同步复制和异步复制,通过brokerRole进行配置,ASYNC_MASTER和SYNC_MASTER,SLAVE

    • 推荐同步复制(双写),异步刷盘

  • 发送端高可用

双主双从架构:创建topic的时候,messageQueue创建在多个broker组上,即相同的broker名称,不同的brokerid;当一个maste不可以时,组内其它master仍然可用, 但是机器资源不足的时候,需要手工把slave转成master。4.7版本以后可以使用rocketmq DLedger集群自动选举leader,主从节点切换。

  • 消费者高可用

    • 主从架构:broker角色、master提供读写,slave提供读

    • consumer不需要配置,当master不可以或者繁忙时,consumer会自动切换到slave节点能读取到

  • 提高消息的消费能力

    • 并行消费

      • 增加多节点

      • 增加单个sonsumer的并行度,修改consumerThreadMin和consumerThreadMax

      • 批量消费,设置Consumer的consumerMessageBatchMaxSize,默认是1,如果为N,则消息多的时候每次收到N条消息

    • 择linux ext4文件系统,ext4文件系统删除1G大小的文件通常耗时51ms,而ext3文件系统耗时1s,删除文件磁盘的IO压力极大,会导致IO超时