rocketMQ
一、rocketMQ特点
Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件 特点:
支持Broker和Consumer端消息过滤
适合金融类业务,高可用性跟踪和审计功能。
支持发布订阅模型,和点对点
支持拉pull和推push两种消息模式
单一队列百万消息
支持单master节点,多master节点,多master多slave节点
消息失败重发机制、支持特定level的定时消息
新版本底层用的netty
4.3.x开始支持分布式事务
官网地址:http://rocketmq.apache.org/
学习资源:
二、rocketMQ概念
name|说明
:---😐:---:
Producer|消息生产者
Producer Group|消息生产者组,发送同类消息的一个消息生产组
Consumer|消费者
Consumer Group|消费同个消息的多个实例
Tag|标签,子主题(二级分类),用于区分同一个主题下的不同业务的消息
Topic|主题,如订阅消息,queue是消息的物理管理单位,而topic是逻辑管理单位,一个topic下可以有多个queue
Message|消息,每个message必须指定一个topic
Broker|MQ程序,接收生产的消息,提供给消费者消费的程序
Name Server|给生产和消费者提供路由信息,提供轻量级的服务发现和路由
offset|偏移量,可以理解为消息进度
commit log|消息存储会写在commit log文件里
三、rocketMQ消息发送状态
broker消息投递状态
- SEND_OK
消息发送成功
- FLUSH_DISK_TIMEOUT
没有在规定时间内完成刷盘(刷盘策略需要为SYNC_FLUSH同步刷盘策略才会出现)
- FLUSH_SLAVE_TIMEOUT
主从模式下,broker是SYNC_MASTER,没有在规定时间内完成主从同步
- SLAVE_NOT_AVALABLE
从模式下,broker是SYNC_MASTER,但是没有找到被配置成SLAVE的broker
四、延迟消息消费
延迟消息的消费级别,即当生产者发送到rocketMQ服务器上,多长时间以后消费者才能消费。默认是立即消费的情况。
消费的级别在源码的MessageStoreConfig类下的这个属性。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";在发送消息的时候设置message.setDelayTimeLevel(1);1表示1秒以后才能消费,从1开始计算,1表示1秒,2表示5秒。
五、指定队列消费
为了保证队列在高的访问量的时候,不影响其它队列的时候,指定消费的队列。
//指定消息队列消费该消息,后面0的参数表示指定的第一个队列。callback为回调函数
payProducer.getProducer().send(message,
(mqList,msg,arg) -> {
int queueNum = Integer.parseInt(arg.toString());
return mqList.get(queueNum);
},
0,
new SendCallback() {
@Override public void onSuccess(SendResult sendResult){
asyncCallBack(sendResult);
}
@Override public void onException(Throwable throwable){
asyncFailed(throwable);
}
});六、消费者通过用户属性进行消费(不建议使用)
在发送消息的时候增加自定义的用户属性,因为topic是一级标签,tags是二级标签,用户属性可以理解为过滤条件。发送的时候增加属性。
message.putUserProperty("aaa", String.valueOf(100));
这个时候可能rocketMQ集群不支持这个机制,修改配置文件,在配置文件加上配置enablePropertyFilter=true。
消费者类改为PushConsumer,消费属性aaa在300和381之间:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.subscribe("topic1", MessageSelector.bySql("aaa> 300 and aaa between 250 and 381 "));
RocketMQ只实现了一些基础的语法,支持=、>、<、>=、<=、between and、<>、in、is null、is not null、and、or、not。字符串需要使用单引号包起来。
七、偏移量offset
message queue是无限长的数组,一条消息进来下标就会涨1,下标就是offset,消息在某个messageQueue里的位置,通过offset的值可以定位到这条消息,或者提示comsumer从这条消息开始向后处理。
message queue中的maxOffset表示消息的最大offset,maxOffset不是最新的消息offset,而是最新的offset+1,minoffset则是现存最小的offset。
fileReserverTime=48默认消息存储48小时以后,消费会被物理的从磁盘删除,message queue的minoffset也就相应的增长。所以比minoffset还要小的消息已经不存储在broker上了, 就无法被消费。
类型(父类是offsetStore):
- 本地文件类型
DefaultMQPushConsumer的BROADCASTING模式,各个consumer互不干扰,使用localFileOffsetStore,把offset存储在本地
- broker代存储类型
DefaultMQPushConsumer的CLUSTERING(集群)模式,有Broker端存储和控制offset的值,使用RemoteBrokerOffsetStore
推荐使用PushConsumer,RoketMQ自动维护offsetStore。如果使用PullConsumer需要自己进行维护offsetStore
八、可靠性传输
rocketMQ的可靠性传输:
producer端
不采用oneway发送,使用同步或异步方式发送,做好重试,但是重试的message key必须唯一
投递的日志需要保存,关键字段,投递时间、投递状态、重试次数、请求体、响应体
broker端
双主从架构,nameserver需要多节点
同步双写、异步刷盘(同步刷盘可靠性更高,但是性能略差,根据业务进行选择)
consumer端
消费消息务必保留消费日志,即消息的元数据和消息体
消费端务必做好幂等性处理
投递到broker后
机器端点重启:异步刷盘,消息丢失;同步刷盘消息不丢失
硬件故障:可能存在丢失,看队列架构
九、高性能分析
MQ架构配置
顺序写,随机读,零拷贝
同步刷盘SYNC_FLUSH和异步刷盘ASYNC_FLUSH,通过flushDiskType配置
同步复制和异步复制,通过brokerRole进行配置,ASYNC_MASTER和SYNC_MASTER,SLAVE
推荐同步复制(双写),异步刷盘
发送端高可用
双主双从架构:创建topic的时候,messageQueue创建在多个broker组上,即相同的broker名称,不同的brokerid;当一个maste不可以时,组内其它master仍然可用, 但是机器资源不足的时候,需要手工把slave转成master。4.7版本以后可以使用rocketmq DLedger集群自动选举leader,主从节点切换。
消费者高可用
主从架构:broker角色、master提供读写,slave提供读
consumer不需要配置,当master不可以或者繁忙时,consumer会自动切换到slave节点能读取到
提高消息的消费能力
并行消费
增加多节点
增加单个sonsumer的并行度,修改consumerThreadMin和consumerThreadMax
批量消费,设置Consumer的consumerMessageBatchMaxSize,默认是1,如果为N,则消息多的时候每次收到N条消息
择linux ext4文件系统,ext4文件系统删除1G大小的文件通常耗时51ms,而ext3文件系统耗时1s,删除文件磁盘的IO压力极大,会导致IO超时
