Skip to content

springboot整合activeMQ

一、添加依赖

<!-- 整合消息队列ActiveMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 如果配置线程池则加入 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

二、配置文件增加

activemq:
 #默认端口号61616
 broker-url: tcp://127.0.0.1:61616
 #集群配置
 broker-url: failover:(tcp://localhost:61616,tcp://localhost:61617)
 user: admin
 password: admin
 pool:
     enabled: true
     max-connections: 100
#支持发布订阅模型,默认只支持点对点
 jms:
  pub-sub-domain: true

三、开启注解

在启动类或者config类增加注解@EnableJms

四、配置生产者

如果需要配置默认的Queue,在配置类下新增bean

//队列
@Bean
   public Queue queue(){
       return new ActiveMQQueue("connect.queue");
   }
//发布订阅
   @Bean
       public Topic topic(){
           return new ActiveMQTopic("video.topic");
       }

//配置修改工厂配置,即同时支持点对点和发布订阅模型
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
    DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    bean.setPubSubDomain(true);
    bean.setConnectionFactory(activeMQConnectionFactory);
    return bean;
}

配置Service和实现类。

//配置发送消息的队列
@Override public void sendMessage(Destination destination,String message){
    jmsTemplate.convertAndSend(destination,message);
}
//配置默认发送消息的队列
@Override public void sendMessage(String message){
    jmsTemplate.convertAndSend(this.queue,message);
}
//配置默认发布订阅
@Override public void publish(String message){
       this.jmsTemplate.convertAndSend(this.topic,message);
   }

五、配置消费者

@Component
public class JmsConsumer {

//监听器监听消息队列,如果有消息发送过来,触发该方法
    @JmsListener(destination = "connect.order")
    public void receiveQueue(String message){
        System.out.println(LocalDateTime.now()+":"+message);
    }
//订阅消息,pub-sub-domain: true配置开启的情况下
    @JmsListener(destination = "video.topic")
        public void receiveTopic1(String message){
            System.out.println(LocalDateTime.now()+" receive1:"+message);
        }
//订阅消息,pub-sub-domain: true配置开启的情况下
        @JmsListener(destination = "video.topic")
        public void receiveTopic2(String message){
            System.out.println(LocalDateTime.now()+" receive2:"+message);
        }
//订阅消息,不开启配置,同时支持点对点和发布订阅模型
@JmsListener(destination = "video.topic",containerFactory="jmsListenerContainerTopic")
   public void receiveTopic3(String message){
       System.out.println(LocalDateTime.now()+" receive3:"+message);
   }
}

六、队列的使用

@GetMapping("order")
    public Map<String,Object> order(String message){
        Map<String,Object> jsonMap = new HashMap<>(16);
        Destination destination = new ActiveMQQueue("connect.order");
        jmsService.sendMessage(destination,message);
        CommonUtil.writeSuccessMap(jsonMap,"消息发送成功");
        return jsonMap;
    }

    @GetMapping("commonOrder")
    public Map<String,Object> common(String message){
        Map<String,Object> jsonMap = new HashMap<>(16);
        jmsService.sendMessage(message);
        CommonUtil.writeSuccessMap(jsonMap,"消息发送成功");
        return jsonMap;
    }

    @GetMapping("topic")
    public Map<String,Object> topic(String message){
        Map<String,Object> jsonMap = new HashMap<>(16);
        jmsService.publish(message);
        CommonUtil.writeSuccessMap(jsonMap,"订阅消息发送成功");
        return jsonMap;
    }