Skip to content

死信队列

一、介绍

TTL:time to live 消息存活时间

如果消息在存活时间内未被消费,则会别清除

两种ttl设置

  • 单独消息进行配置ttl

  • 整个队列进行配置ttl(居多)

**死信队列:**没有被及时消费的消息存放的队列

**死信交换机(Dead Letter Exchange,缩写:DLX):**当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机。

成为死信的情况

  • 消费者拒收消息**(basic.reject/ basic.nack)**,并且没有重新入队 requeue=false

  • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)

  • 队列的消息长度达到极限,如队列长度为1000,满了没有被消费,后面来的消息就成为死信

**结果:**消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

二、死信队列创建流程

  1. 新建死信交换机(本质上就是普通交换机),选择topic类型和virtual host

  2. 新建死信队列(本质上普通队列),队列选择virtual host

  3. 死信交换机绑和队列绑定

  4. 新建普通队列,设置过期时间、指定死信交换机。配置arguments参数

    • X-message-ttl:过期时间

    • X-dead-letter-exchange:死信交换机

    • X-dead-letter-routing-key:投递的死信队列

三、应用场景--延迟队列

**延迟队列:**一种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费, 该消息即定时消息。

使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息

  • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;

  • 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

延迟队列的实现方式:

  • 定时任务高精度轮训

  • 采用RocketMQ自带延迟消息功能

  • RabbitMQ结合死信队列达到延迟队列消费

四、实现

五、配置文件类

package com.lcy.cloud.video.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.lcy.cloud.video.constant.RabbitmqConstant;

/**
 * @Description rabbitmq配置类--死信队列
 * @Author lcy
 * @Date 2021/8/4 16:59
 */
@Configuration
public class RabbitMqDeadExchangeConfig {

    /**
     * 死信交换机
     * @return org.springframework.amqp.core.Exchange
     * @author lcy
     * @date 2021/8/8 12:31
     **/
    @Bean
    public Exchange lockDeadExchange(){
        return new TopicExchange(RabbitmqConstant.LOCK_DEAD_EXCHANGE_NAME,true,false);
    }

    /**
     * 死信队列
     * @return org.springframework.amqp.core.Queue
     * @author lcy
     * @date 2021/8/8 12:32
     **/
    @Bean
    public Queue lockDeadQueue(){
        return QueueBuilder.durable(RabbitmqConstant.LOCK_DEAD_QUEUE_NAME).build();
    }

    /**
     * 绑定死信交换机和死信队列
     * @return org.springframework.amqp.core.Binding
     * @author lcy
     * @date 2021/8/8 12:32
     **/
    @Bean
    public Binding lockBinding(){
        //绑定的名称,绑定的类型,绑定的交换机名称,路由键,参数
        return new Binding(RabbitmqConstant.LOCK_DEAD_QUEUE_NAME,Binding.DestinationType.QUEUE,
                RabbitmqConstant.LOCK_DEAD_EXCHANGE_NAME,RabbitmqConstant.LOCK_DEAD_ROUTING_KEY,null);
    }

    /**
     * 普通交换机
     * @return org.springframework.amqp.core.Exchange
     * @author lcy
     * @date 2021/8/8 12:34
     **/
    @Bean
    public Exchange newExchange(){
        return new TopicExchange(RabbitmqConstant.LOCK_EXCHANGE_NAME,true,false);
    }

    /**
     * 普通队列
     * @return org.springframework.amqp.core.Queue
     * @author lcy
     * @date 2021/8/8 12:35
     **/
    @Bean
    public Queue newQueue(){

        Map<String,Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange",RabbitmqConstant.LOCK_DEAD_EXCHANGE_NAME);

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key",RabbitmqConstant.LOCK_DEAD_ROUTING_KEY);

        //过期时间,单位毫秒
        args.put("x-message-ttl",10000);

        return QueueBuilder.durable(RabbitmqConstant.LOCK_QUEUE_NAME).withArguments(args).build();
    }

    /**
     * 绑定交换机和队列
     * @return org.springframework.amqp.core.Binding
     * @author lcy
     * @date 2021/8/8 12:36
     **/
    @Bean
    public Binding newBinding(){
        return new Binding(RabbitmqConstant.LOCK_QUEUE_NAME,Binding.DestinationType.QUEUE,
                RabbitmqConstant.LOCK_EXCHANGE_NAME,RabbitmqConstant.LOCK_ROUTING_KEY,null);
    }

}

六、生产者

package com.lcy.cloud.video.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.lcy.base.common.response.Result;
import com.lcy.base.common.util.ResultUtil;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.service.VideoService;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;

/**
 * @Description Controller
 * @Author lcy
 * @Date 2021-07-29
 */
@RestController
@RequestMapping("/api/v1/video")
@Api(tags = {"服务入口"})
@AllArgsConstructor
public class VideoController {

    /**
     * 接口
     **/
    private final VideoService videoService;

    /**
     * rabbitmq
     **/
    private final RabbitTemplate rabbitTemplate;

    @GetMapping("sendMessage")
    public Result<Object> sendMessage(){
        //发送消息到指定队列
        rabbitTemplate.convertAndSend(RabbitmqConstant.LOCK_EXCHANGE_NAME,RabbitmqConstant.LOCK_ROUTING_KEY,"send message");
        return ResultUtil.success();
    }

}

七、死信消费者

package com.lcy.cloud.video.listener;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.rabbitmq.client.Channel;

/**
 * @Description 死信消费者
 * @Author lcy
 * @Date 2021/8/4 17:22
 */
@Component
@RabbitListener(queues = RabbitmqConstant.LOCK_DEAD_QUEUE_NAME)
public class LockDeadListener {

    @RabbitHandler
    public void releaseCouponRecord(String msg,Message message,Channel channel) throws IOException{
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag=" + msgTag);
        System.out.println("message=" + message);
        System.out.println("监听到死信消息:消息内容:" + new String(message.getBody()));
        //false表示是否多条确认
        channel.basicAck(msgTag,false);
    }

}