Springboot2.x整合rabbitmq
一、spring-AMQP
Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象, 最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发
总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter
当有新的AMQP协议的消息队列产品出现,根据这个协议进行消息队列的替换,虽然不能保证百分之百,但是提供了一个很好的接口
二、依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>三、配置文件
spring:
rabbitmq:
#集群配置,不能同时与单机一起配置
addresses: 127.0.0.1:5672,127.0.0.1:5673,127.0.0.1:5674
#单机配置
host: 127.0.0.1
port: 5672
virtual-host: /dev
username: guest
password: guest四、配置类
package com.lcy.cloud.video.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.lcy.cloud.video.constant.RabbitmqConstant;
/**
* @Description rabbitmq配置类
* @Author lcy
* @Date 2021/8/4 16:59
*/
@Configuration
public class RabbitMqConfig {
/**
* 交换机对象
* @return org.springframework.amqp.core.Exchange
* @author lcy
* @date 2021/8/4 17:07
**/
@Bean
public Exchange orderExchange(){
return ExchangeBuilder.topicExchange(RabbitmqConstant.ORDER_EXCHANGE_NAME).durable(true).build();
}
/**
* 队列对象
* @return org.springframework.amqp.core.Queue
* @author lcy
* @date 2021/8/4 17:08
**/
@Bean
public Queue orderQueue(){
return QueueBuilder.durable(RabbitmqConstant.ORDER_QUEUE_NAME).build();
}
/**
* 交换机和队列绑定关系
* @param queue 队列
* @param exchange 交换机
* @return org.springframework.amqp.core.Binding
* @author lcy
* @date 2021/8/4 17:08
**/
@Bean
public Binding orderBinding(Queue queue,Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}五、消息
六、生产者
package com.lcy.cloud.video;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.lcy.cloud.video.constant.RabbitmqConstant;
/**
* @Description 订单生产者
* @Author lcy
* @Date 2021/8/4 17:10
*/
@SpringBootTest(classes = VideoApplication.class)
public class OrderProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void order(){
rabbitTemplate.convertAndSend(RabbitmqConstant.ORDER_EXCHANGE_NAME,"order.new","new order");
}
}七、消费者
package com.lcy.cloud.video.listener;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.lcy.cloud.video.constant.RabbitmqConstant;
/**
* @Description 订单消费者
* @Author lcy
* @Date 2021/8/4 17:22
*/
@Component
@RabbitListener(queues = RabbitmqConstant.ORDER_QUEUE_NAME)
public class OrderListener {
@RabbitHandler
public void releaseCouponRecord(String msg,Message message) throws IOException{
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag=" + msgTag);
System.out.println("message=" + message);
System.out.println("监听到消息:消息内容:" + new String(message.getBody()));
}
}八、消息可靠性投递
九、发送者到broker(交换机)
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated@Test
void testConfirmCallback(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param ack 交换机是否收到消息,true是成功,false是失败
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData,boolean ack,String cause){
System.out.println("confirm=====>");
System.out.println("confirm==== ack=" + ack);
System.out.println("confirm==== cause=" + cause);
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
});
rabbitTemplate.convertAndSend(RabbitmqConstant.ORDER_EXCHANGE_NAME,"order.new","new order");
}十、broker到queue队列
第一步:在配置文件开启全局的
#一定要开启
spring.rabbitmq.publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true也可以通过一下的方式不设置全局
rabbitTemplate.setMandatory(true);@Test
void testReturnCallback(){
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message,int i,String s,String s1,String s2){
System.out.println("message" + new String(message.getBody()));
System.out.println("i" + message);
System.out.println("s" + s);
System.out.println("s1" + s1);
System.out.println("s2" + s2);
}
});
rabbitTemplate.convertAndSend(RabbitmqConstant.ORDER_EXCHANGE_NAME,"order.new","new order return message");
}