死信队列
一、介绍
TTL:time to live 消息存活时间
如果消息在存活时间内未被消费,则会别清除
两种ttl设置
单独消息进行配置ttl
整个队列进行配置ttl(居多)
**死信队列:**没有被及时消费的消息存放的队列
**死信交换机(Dead Letter Exchange,缩写:DLX):**当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机。
成为死信的情况
消费者拒收消息**(basic.reject/ basic.nack)**,并且没有重新入队 requeue=false
消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
队列的消息长度达到极限,如队列长度为1000,满了没有被消费,后面来的消息就成为死信
**结果:**消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
二、死信队列创建流程
新建死信交换机(本质上就是普通交换机),选择topic类型和virtual host
新建死信队列(本质上普通队列),队列选择virtual host
死信交换机绑和队列绑定
新建普通队列,设置过期时间、指定死信交换机。配置arguments参数
X-message-ttl:过期时间
X-dead-letter-exchange:死信交换机
X-dead-letter-routing-key:投递的死信队列
三、应用场景--延迟队列
**延迟队列:**一种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费, 该消息即定时消息。
使用场景
通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略
延迟队列的实现方式:
定时任务高精度轮训
采用RocketMQ自带延迟消息功能
RabbitMQ结合死信队列达到延迟队列消费
四、实现
五、配置文件类
package com.lcy.cloud.video.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.lcy.cloud.video.constant.RabbitmqConstant;
/**
* @Description rabbitmq配置类--死信队列
* @Author lcy
* @Date 2021/8/4 16:59
*/
@Configuration
public class RabbitMqDeadExchangeConfig {
/**
* 死信交换机
* @return org.springframework.amqp.core.Exchange
* @author lcy
* @date 2021/8/8 12:31
**/
@Bean
public Exchange lockDeadExchange(){
return new TopicExchange(RabbitmqConstant.LOCK_DEAD_EXCHANGE_NAME,true,false);
}
/**
* 死信队列
* @return org.springframework.amqp.core.Queue
* @author lcy
* @date 2021/8/8 12:32
**/
@Bean
public Queue lockDeadQueue(){
return QueueBuilder.durable(RabbitmqConstant.LOCK_DEAD_QUEUE_NAME).build();
}
/**
* 绑定死信交换机和死信队列
* @return org.springframework.amqp.core.Binding
* @author lcy
* @date 2021/8/8 12:32
**/
@Bean
public Binding lockBinding(){
//绑定的名称,绑定的类型,绑定的交换机名称,路由键,参数
return new Binding(RabbitmqConstant.LOCK_DEAD_QUEUE_NAME,Binding.DestinationType.QUEUE,
RabbitmqConstant.LOCK_DEAD_EXCHANGE_NAME,RabbitmqConstant.LOCK_DEAD_ROUTING_KEY,null);
}
/**
* 普通交换机
* @return org.springframework.amqp.core.Exchange
* @author lcy
* @date 2021/8/8 12:34
**/
@Bean
public Exchange newExchange(){
return new TopicExchange(RabbitmqConstant.LOCK_EXCHANGE_NAME,true,false);
}
/**
* 普通队列
* @return org.springframework.amqp.core.Queue
* @author lcy
* @date 2021/8/8 12:35
**/
@Bean
public Queue newQueue(){
Map<String,Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange",RabbitmqConstant.LOCK_DEAD_EXCHANGE_NAME);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key",RabbitmqConstant.LOCK_DEAD_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl",10000);
return QueueBuilder.durable(RabbitmqConstant.LOCK_QUEUE_NAME).withArguments(args).build();
}
/**
* 绑定交换机和队列
* @return org.springframework.amqp.core.Binding
* @author lcy
* @date 2021/8/8 12:36
**/
@Bean
public Binding newBinding(){
return new Binding(RabbitmqConstant.LOCK_QUEUE_NAME,Binding.DestinationType.QUEUE,
RabbitmqConstant.LOCK_EXCHANGE_NAME,RabbitmqConstant.LOCK_ROUTING_KEY,null);
}
}六、生产者
package com.lcy.cloud.video.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.lcy.base.common.response.Result;
import com.lcy.base.common.util.ResultUtil;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.service.VideoService;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;
/**
* @Description Controller
* @Author lcy
* @Date 2021-07-29
*/
@RestController
@RequestMapping("/api/v1/video")
@Api(tags = {"服务入口"})
@AllArgsConstructor
public class VideoController {
/**
* 接口
**/
private final VideoService videoService;
/**
* rabbitmq
**/
private final RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage")
public Result<Object> sendMessage(){
//发送消息到指定队列
rabbitTemplate.convertAndSend(RabbitmqConstant.LOCK_EXCHANGE_NAME,RabbitmqConstant.LOCK_ROUTING_KEY,"send message");
return ResultUtil.success();
}
}七、死信消费者
package com.lcy.cloud.video.listener;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.rabbitmq.client.Channel;
/**
* @Description 死信消费者
* @Author lcy
* @Date 2021/8/4 17:22
*/
@Component
@RabbitListener(queues = RabbitmqConstant.LOCK_DEAD_QUEUE_NAME)
public class LockDeadListener {
@RabbitHandler
public void releaseCouponRecord(String msg,Message message,Channel channel) throws IOException{
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag=" + msgTag);
System.out.println("message=" + message);
System.out.println("监听到死信消息:消息内容:" + new String(message.getBody()));
//false表示是否多条确认
channel.basicAck(msgTag,false);
}
}