消息队列常见问题
一、如何保证有序性
消息的生产和消费顺序一致。
全局顺序:一个topic下全部消息都要有序(少用),性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景,吞吐量不够。
局部顺序:只要保证一组消息被顺序消费即可,性能要求高
如:rocketMQ保证局部顺序,首先生产者,在发布的时候根据唯一表示,比如流水号取模,再放到selector中,同一个模都会投递到相同的queue当中(kafka的是Partition)。 这样就能保证生产者的顺序。
消费者,消费者在注册是时候使用MessageListenerOrderly监听器,这是保证线程的有序消费,消费模式选择消费第一个。
//消费模式--默认消费最后一个。CONSUME_FROM_FIRST_OFFSET是消费第一个
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//监听主题的内容,*代表监听所有
consumer.subscribe(RocketMQTopic.PAY_ORDER_TOPIC,"*");
//注册监听器--多线程的消费注册MessageListenerConcurrently,返回对象为ConsumeConcurrentlyStatus
// 如果是单线程的消费(保证消费有序)使用MessageListenerOrderly,返回对象为ConsumeOrderlyStatus
consumer.registerMessageListener((MessageListenerOrderly)(messages,context) -> {
Message message = messages.get(0);
Log.info("消费消息:{} Receive new Message :{} ",Thread.currentThread().getName(),new String(message.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
});二、如何避免重复消费
幂等性:一个请求不管重复多少次,结果是不会有变化
通过redis来控制。
redis的setNX(),做消息id去重。java版本目前不支持过期时间。
redis的incr原子操作,key自增,返回值大于0说明消费过。
数据库的唯一索引来控制。如流水号id、userid重复等等
