Skip to content

Springboot2.x整合rabbitmq

一、spring-AMQP

Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象, 最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发

总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter

当有新的AMQP协议的消息队列产品出现,根据这个协议进行消息队列的替换,虽然不能保证百分之百,但是提供了一个很好的接口

二、依赖

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

三、配置文件

spring:
  rabbitmq:
      #集群配置,不能同时与单机一起配置
      addresses: 127.0.0.1:5672,127.0.0.1:5673,127.0.0.1:5674
      #单机配置
    host: 127.0.0.1
    port: 5672
    virtual-host: /dev
    username: guest
    password: guest

四、配置类

package com.lcy.cloud.video.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.lcy.cloud.video.constant.RabbitmqConstant;

/**
 * @Description rabbitmq配置类
 * @Author lcy
 * @Date 2021/8/4 16:59
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 交换机对象
     * @return org.springframework.amqp.core.Exchange
     * @author lcy
     * @date 2021/8/4 17:07
     **/
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(RabbitmqConstant.ORDER_EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 队列对象
     * @return org.springframework.amqp.core.Queue
     * @author lcy
     * @date 2021/8/4 17:08
     **/
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(RabbitmqConstant.ORDER_QUEUE_NAME).build();
    }

    /**
     * 交换机和队列绑定关系
     * @param queue    队列
     * @param exchange 交换机
     * @return org.springframework.amqp.core.Binding
     * @author lcy
     * @date 2021/8/4 17:08
     **/
    @Bean
    public Binding orderBinding(Queue queue,Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

}

五、消息

六、生产者

package com.lcy.cloud.video;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import com.lcy.cloud.video.constant.RabbitmqConstant;

/**
 * @Description 订单生产者
 * @Author lcy
 * @Date 2021/8/4 17:10
 */
@SpringBootTest(classes = VideoApplication.class)
public class OrderProduct {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void order(){
        rabbitTemplate.convertAndSend(RabbitmqConstant.ORDER_EXCHANGE_NAME,"order.new","new order");
    }

}

七、消费者

package com.lcy.cloud.video.listener;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.lcy.cloud.video.constant.RabbitmqConstant;

/**
 * @Description 订单消费者
 * @Author lcy
 * @Date 2021/8/4 17:22
 */
@Component
@RabbitListener(queues = RabbitmqConstant.ORDER_QUEUE_NAME)
public class OrderListener {

    @RabbitHandler
    public void releaseCouponRecord(String msg,Message message) throws IOException{
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag=" + msgTag);
        System.out.println("message=" + message);
        System.out.println("监听到消息:消息内容:" + new String(message.getBody()));
    }

}

八、消息可靠性投递

九、发送者到broker(交换机)

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
@Test
void testConfirmCallback(){
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

        /**
         *
         * @param correlationData 配置
         * @param ack 交换机是否收到消息,true是成功,false是失败
         * @param cause 失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData,boolean ack,String cause){
            System.out.println("confirm=====>");
            System.out.println("confirm==== ack=" + ack);
            System.out.println("confirm==== cause=" + cause);
            if (ack) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败");
            }
        }
    });
    rabbitTemplate.convertAndSend(RabbitmqConstant.ORDER_EXCHANGE_NAME,"order.new","new order");
}

十、broker到queue队列

第一步:在配置文件开启全局的

#一定要开启
spring.rabbitmq.publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true

也可以通过一下的方式不设置全局

rabbitTemplate.setMandatory(true);
@Test
void testReturnCallback(){
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

        @Override public void returnedMessage(Message message,int i,String s,String s1,String s2){
            System.out.println("message" + new String(message.getBody()));
            System.out.println("i" + message);
            System.out.println("s" + s);
            System.out.println("s1" + s1);
            System.out.println("s2" + s2);
        }
    });
    rabbitTemplate.convertAndSend(RabbitmqConstant.ORDER_EXCHANGE_NAME,"order.new","new order return message");
}