消息队列(JMS)
一、JMS介绍
什么是JMS:java消息服务(Java Message Service),java平台中关于消息中间件的接口
JMS是一种与厂商无关的API,用来访问消息,收发系统消息,它类似JDBC,如jdbc可以用来访问许多不同关系数据库的API
二、使用场景
核心应用
解耦:传统模式系统之间的耦合性太强,系统A和系统B之间通信需要连接,A之间调用B的代码执行,如果有别的系统接入,A还要改造。改用消息队列可以直接解耦。将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。如:订单系统->物流系统
异步:A系统调用B的代码的时候,是一种同步的方式,A会一直等待,直到B响应。使用消息队列则是异步进行操作。如:用户注册->发送邮件,初始化信息。
削峰:当并发量大的时候,大量请求进来的情况下,传统模式需要保证高并发的场景,消息队列则慢慢拉取消息处理。如:秒杀、日志处理
跨平台、多语言
分布式事务、最终一致性
RPC调用上下游对接,数据源变动->通知下属
三、概念和模型
常见概念
JMS提供者:连接面向消息中间件的,JMS接口的一个实现,rocketMQ、activeMQ、kafka等等。
JMS生产者(Message Producer):生产消息的服务
JMS消费者(Message Consumer):消费消息的服务
JMS消息:数据对象
JMS队列:存储待消费消息的区域
JMS主题:一种支持发送多个消息给多个订阅者的机制
JMS消息通常有两种类型:点对点(point-to-point)、发布/订阅(publish/Subscribe)
基础变成模型
MQ中常用的一些类
ConnectionFactory:连接工厂,JMS用它创建连接
Connection:JMS客户端到MS provider的连接
Session:一个发送或接收消息的线程
Detination:消息的目的地,消息发送给谁
MessageConsumer/MessageProducer:消息消费者,消息生产者
四、使用消息队列的缺点
消息队列是一个中间件,如果消息队列服务挂了,A与B的系统都正常,这时候是无法处理业务,系统的可用性会降低。
加入了消息队列,要保证消息的一致性,会使系统变得更加复杂,要考虑消息丢失处理,消息重发等等处理。
五、消息队列如何选型
既然在项目中用了MQ,肯定事先要对业界流行的MQ进行调研,如果连每种MQ的优缺点都没没了解清楚,就拍脑袋依据喜好来选用某种MQ,这是给项目挖坑。
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
| ---------- | ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| 开发语言 | java | erlang | java | scala |
| 单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
| 时效性 | ms级 | us级 | ms级 | ms级以内 |
| 可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
| 功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
| 特点 | apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言java、c++、.net等,基于JMS Provider的实现 | 是一个开源的AMQP实现,服务器用Erlang语言编写,支持多种客户端如python、java等用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错 | 阿里开源的一款中间件,纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。性能强劲(零拷贝技术),支持海量堆积,支持指定次数和时间间隔的失败消息重发,支持comsumer端tag过滤、延迟消息等。 | 由apache软件基金会开发的开源流处理平台,由scala和java编写。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模网站中的所有动作流数据,副本集机制,实现数据冗余,保障数据不丢失;支持多个生产者和消费者 |
| 缺点 | 吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用 | 使用erlang开发,阅读和修改源码难度大 | | 不支持批量和广播消息,运维难度大,文档比较少 |
六、如何保证消息队列是高可用的
通过集群模式保证高可用,越多的节点,吞吐率越高
七、如何保证消息不被重复消费
即保证保证消息的幂等性操作。
插入的时候做唯一主键的判断,类似redis的set操作,不管消费几次,数据都是一致的。或者通过第三方介质,比如redis,消费开始前从redis判断是否已经被消费过,消费结束了以后再redis加一个会超时的记录。
//下面两个方法不适合用于分布式锁,消费的时候要进行幂等性操作,保证消费重复不影响 //redis booleanflag=jedis.setNX(key); if(flag){ //未消费,进行消费 }else{ //已消费,忽略 } //redis原子性判断 intnum=jedis.incr(key); if(num==1){ //未消费,进行消费 }else{ //已消费,忽略 }数据库去重表
新建一张表,用来判断是否消费。将交易的业务id当做主键,如果消费了,像表里插入一条记录,如果重复消费,因为业务id是主键,则插入不进去。回滚操作。如果表中已经有数据则不消费消息。
//数据库操作
intsave=mapper.save(entity);
if(save>0){
//未消费的消息,消费
}else{
//已消费,不进行重复消费
}八、消息堆积处理
消息堆积了很久,有几千万条消息待处理,是要修复consumer然后慢慢消费?也需要好几个小时,新的消息也会发送进来。怎么解决?
临时topic队列扩容,提供消费者能力,如果增加了consumer的数量,topic里的queue数量也需要增加,否则过多的消息不会分配到消费者当中。
临时编写分发程序,从旧的topic快速读取到临时新topic当中,新topic的queue数量要扩容,然后启动更多的consumer对临时的topic进行消费。
