Skip to content

springboot整合activeMQ

一、添加依赖

<!--        rocketMQ依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.4.0</version>
        </dependency>

二、配置文件增加

自定义配置文件信息

apache:
  rocketmq:
    producer:
      producerGroup: Producer
    consumer:
      PushConsumer: orderConsumer
    namesrvAddr: 192.168.56.66:9876

三、配置生产者

配置producer

@Component
public class RocketProducer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 初始化
     * @author lcy
     * @date 2019/11/13 15:47
     **/
    @PostConstruct
    public void init(){
        //生产者的组名
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
        producer.setNamesrvAddr(namesrvAddr);
        producer.setVipChannelEnabled(false);

        try {
            //Producer对象在使用之前必须要调用start初始化,只能初始化一次
            producer.start();

        } catch (Exception e) {
            logger.error("初始化生产者失败。",e);
        }

        // producer.shutdown();  一般在应用上下文,关闭的时候进行关闭,用上下文监听器

    }

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private DefaultMQProducer producer;

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

}

四、配置消费者

@Component
public class RocketConsumer {

    @PostConstruct
    public void init(){
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);

        try {
            //设置consumer所订阅的Topic和Tag,*代表全部的Tag
            consumer.subscribe("testTopic","*");

            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            //MessageListenerOrderly 这个是有序的
            //MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
            consumer.registerMessageListener((MessageListenerConcurrently)(list,context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });

            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

}

五、队列的使用

@GetMapping("rocketProducer")
    public Map<String,Object> rocketProducer(String message,String tag) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException{
        Map<String,Object> jsonMap = new HashMap<>(16);
        Message msg = new Message("testTopic",tag,message.getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult result = rocketProducer.getProducer().send(msg);
        System.out.println("返回响应id:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
        CommonUtil.writeSuccessMap(jsonMap,"消息发送成功");
        return jsonMap;
    }