springboot整合activeMQ
一、添加依赖
<!-- rocketMQ依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.4.0</version>
</dependency>二、配置文件增加
自定义配置文件信息
apache:
rocketmq:
producer:
producerGroup: Producer
consumer:
PushConsumer: orderConsumer
namesrvAddr: 192.168.56.66:9876三、配置生产者
配置producer
@Component
public class RocketProducer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 初始化
* @author lcy
* @date 2019/11/13 15:47
**/
@PostConstruct
public void init(){
//生产者的组名
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
//Producer对象在使用之前必须要调用start初始化,只能初始化一次
producer.start();
} catch (Exception e) {
logger.error("初始化生产者失败。",e);
}
// producer.shutdown(); 一般在应用上下文,关闭的时候进行关闭,用上下文监听器
}
/**
* 生产者的组名
*/
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer;
public DefaultMQProducer getProducer(){
return this.producer;
}
}四、配置消费者
@Component
public class RocketConsumer {
@PostConstruct
public void init(){
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("testTopic","*");
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//MessageListenerOrderly 这个是有序的
//MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
consumer.registerMessageListener((MessageListenerConcurrently)(list,context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);//输出消息内容
String messageBody = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET);
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 消费者的组名
*/
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
}五、队列的使用
@GetMapping("rocketProducer")
public Map<String,Object> rocketProducer(String message,String tag) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException{
Map<String,Object> jsonMap = new HashMap<>(16);
Message msg = new Message("testTopic",tag,message.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = rocketProducer.getProducer().send(msg);
System.out.println("返回响应id:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
CommonUtil.writeSuccessMap(jsonMap,"消息发送成功");
return jsonMap;
}