springboot整合activeMQ
一、添加依赖
<!-- 整合消息队列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果配置线程池则加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>二、配置文件增加
activemq:
#默认端口号61616
broker-url: tcp://127.0.0.1:61616
#集群配置
broker-url: failover:(tcp://localhost:61616,tcp://localhost:61617)
user: admin
password: admin
pool:
enabled: true
max-connections: 100
#支持发布订阅模型,默认只支持点对点
jms:
pub-sub-domain: true三、开启注解
在启动类或者config类增加注解@EnableJms
四、配置生产者
如果需要配置默认的Queue,在配置类下新增bean
//队列
@Bean
public Queue queue(){
return new ActiveMQQueue("connect.queue");
}
//发布订阅
@Bean
public Topic topic(){
return new ActiveMQTopic("video.topic");
}
//配置修改工厂配置,即同时支持点对点和发布订阅模型
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}配置Service和实现类。
//配置发送消息的队列
@Override public void sendMessage(Destination destination,String message){
jmsTemplate.convertAndSend(destination,message);
}
//配置默认发送消息的队列
@Override public void sendMessage(String message){
jmsTemplate.convertAndSend(this.queue,message);
}
//配置默认发布订阅
@Override public void publish(String message){
this.jmsTemplate.convertAndSend(this.topic,message);
}五、配置消费者
@Component
public class JmsConsumer {
//监听器监听消息队列,如果有消息发送过来,触发该方法
@JmsListener(destination = "connect.order")
public void receiveQueue(String message){
System.out.println(LocalDateTime.now()+":"+message);
}
//订阅消息,pub-sub-domain: true配置开启的情况下
@JmsListener(destination = "video.topic")
public void receiveTopic1(String message){
System.out.println(LocalDateTime.now()+" receive1:"+message);
}
//订阅消息,pub-sub-domain: true配置开启的情况下
@JmsListener(destination = "video.topic")
public void receiveTopic2(String message){
System.out.println(LocalDateTime.now()+" receive2:"+message);
}
//订阅消息,不开启配置,同时支持点对点和发布订阅模型
@JmsListener(destination = "video.topic",containerFactory="jmsListenerContainerTopic")
public void receiveTopic3(String message){
System.out.println(LocalDateTime.now()+" receive3:"+message);
}
}六、队列的使用
@GetMapping("order")
public Map<String,Object> order(String message){
Map<String,Object> jsonMap = new HashMap<>(16);
Destination destination = new ActiveMQQueue("connect.order");
jmsService.sendMessage(destination,message);
CommonUtil.writeSuccessMap(jsonMap,"消息发送成功");
return jsonMap;
}
@GetMapping("commonOrder")
public Map<String,Object> common(String message){
Map<String,Object> jsonMap = new HashMap<>(16);
jmsService.sendMessage(message);
CommonUtil.writeSuccessMap(jsonMap,"消息发送成功");
return jsonMap;
}
@GetMapping("topic")
public Map<String,Object> topic(String message){
Map<String,Object> jsonMap = new HashMap<>(16);
jmsService.publish(message);
CommonUtil.writeSuccessMap(jsonMap,"订阅消息发送成功");
return jsonMap;
}