CloudStream接入kafka
一、介绍
Spring Cloud Stream官方的说法是一个构建消息驱动微服务的框架。简单来说就是mq的连接、发送、消费等方式都封装成统一的接口,使用者无需关注接入的细节,只需要使用相关的中间件操作即可。 目前官方声明cloudStream支持的mq:kafka、rabbitmq、rocketMQ,基本上对于常用的mq都有了很好的支持。
关系图可以如下:

springcloudstream通过binder绑定消息中间件,然后指定管道进行发送消息和监听消息。
二、项目构建
这里以springboot和springcloud项目为基本条件引入
<!-- kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>三、配置文件
kafka文档:
由于不同版本的配置稍微有些不太一致,需要与文档对应。以3.0.x版本的yml为例:
spring:
cloud:
stream:
kafka:
binder:
#Kafka的消息中间件服务器
brokers: 192.168.1.23:9092
#如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
auto-create-topics: true
bindings:
#消息管道
topic-name:
consumer:
ackMmode: MANUAL_IMMEDIATE
ackEachRecord: true
#手动确认消息,不能单独存在,需要结合ackEachRecord才生效,详细看官网文档介绍
autoCommitOffset: false
poller:
#默认1000 一秒以内不会重复消费
fixed-delay: 2000
bindings:
#消息管道,这里可以任意写,消费者与生产者应一致。并且与上面的kafka bindings一致
topic-name:
#消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
contentType: application/json
#指定消费者分组名称
group: topic-group3.1 注意事项
3.2 消息组
设置管道的消息组会触发kafka的rebalance平衡机制,同组内的消息只会被一个消费者消费一次。如果没有设置消息组,多个不同组的会多次进行消费 kafka开启消息确认以后,不同消息组的消息如果监听了都会消费,但是管道消息组内的消息在没有进行ack确认之前,消息不会被消费,但是同消息组的在线消费者不会再次消费,只有当消费者重新加载实例的时候, 才进行消费。
四、公共配置类
注意除了topic生产者和消费者可以通过同一个类获取以外,生产者和消费者相关的操作分开到不同的类,否则会出现问题
Topic类
/**
* @Description TODO
* @Author lcy
* @Date 2021/1/12 10:18
*/
public class KafkaTopic {
/** topic名称 */
public static final String TOPIC_NAME = "topic";
}五、消息生产者
管道类
/**
* @Description kafka输出管道--消息生产者者管道
* @Author lcy
* @Date 2021/1/12 10:20
*/
public interface KafkaOutputChanel {
@Output(KafkaTopic.TOPIC_NAME)
MessageChannel outputChannel();
}发送消息类:
/**
* @Description 消息生产者
* @Author lcy
* @Date 2021/1/12 10:22
*/
@Component
@Slf4j
@EnableBinding(KafkaOutputChannel.class)
public class KafkaMessageProducer {
@Autowired
private KafkaOutputChanel kafkaOutputChanel;
public void sendMessage(){
kafkaOutputChanel.outputChannel().send(MessageBuilder.withPayload(new Object()).build());
}
}六、消息消费者
管道类
/**
* @Description kafka输入管道--消息消费者管道
* @Author lcy
* @Date 2021/1/12 10:20
*/
public interface KafkaInputChanel {
@Input(KafkaTopic.TOPIC_NAME)
SubscribableChannel inputChannel();
}消费者类
/**
* @Description 消息监听者--消费消息
* @Author lcy
* @Date 2021/1/12 10:22
*/
@Component
@Slf4j
@EnableBinding(KafkaInputChannel.class)
public class KafkaMessageListener {
@StreamListener(KafkaTopic.TOPIC_NAME)
public void listen(Message<?> message,Object messageObject){
//消费消息内容
//如果开启手动确认消息,对象不可能为空,则手动确认
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT,Acknowledgment.class);
if (acknowledgment != null) {
acknowledgment.acknowledge();
log.info("consume kafka message successfully.topic= {} ,message = {} ",KafkaTopic.TOPIC_NAME,messageObject);
}
}
}