Skip to content

CloudStream接入kafka

一、介绍

Spring Cloud Stream官方的说法是一个构建消息驱动微服务的框架。简单来说就是mq的连接、发送、消费等方式都封装成统一的接口,使用者无需关注接入的细节,只需要使用相关的中间件操作即可。 目前官方声明cloudStream支持的mq:kafka、rabbitmq、rocketMQ,基本上对于常用的mq都有了很好的支持。

关系图可以如下:

image-20220608201534183

springcloudstream通过binder绑定消息中间件,然后指定管道进行发送消息和监听消息。

二、项目构建

这里以springboot和springcloud项目为基本条件引入

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

三、配置文件

kafka文档:

由于不同版本的配置稍微有些不太一致,需要与文档对应。以3.0.x版本的yml为例:

spring:
  cloud:
    stream:
      kafka:
        binder:
          #Kafka的消息中间件服务器
          brokers: 192.168.1.23:9092
          #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          auto-create-topics: true
        bindings:
          #消息管道
          topic-name:
            consumer:
              ackMmode: MANUAL_IMMEDIATE
              ackEachRecord: true
              #手动确认消息,不能单独存在,需要结合ackEachRecord才生效,详细看官网文档介绍
              autoCommitOffset: false
      poller:
        #默认1000 一秒以内不会重复消费
        fixed-delay: 2000
      bindings:
        #消息管道,这里可以任意写,消费者与生产者应一致。并且与上面的kafka bindings一致
        topic-name:
          #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
          contentType: application/json
          #指定消费者分组名称
          group: topic-group

3.1 注意事项

3.2 消息组

设置管道的消息组会触发kafka的rebalance平衡机制,同组内的消息只会被一个消费者消费一次。如果没有设置消息组,多个不同组的会多次进行消费 kafka开启消息确认以后,不同消息组的消息如果监听了都会消费,但是管道消息组内的消息在没有进行ack确认之前,消息不会被消费,但是同消息组的在线消费者不会再次消费,只有当消费者重新加载实例的时候, 才进行消费。

四、公共配置类

注意除了topic生产者和消费者可以通过同一个类获取以外,生产者和消费者相关的操作分开到不同的类,否则会出现问题

Topic类

/**
 * @Description TODO
 * @Author lcy
 * @Date 2021/1/12 10:18
 */
public class KafkaTopic {

    /** topic名称 */
    public static final String TOPIC_NAME = "topic";

}

五、消息生产者

管道类

/**
 * @Description kafka输出管道--消息生产者者管道
 * @Author lcy
 * @Date 2021/1/12 10:20
 */
public interface KafkaOutputChanel {

    @Output(KafkaTopic.TOPIC_NAME)
    MessageChannel outputChannel();
}

发送消息类:

/**
 * @Description 消息生产者
 * @Author lcy
 * @Date 2021/1/12 10:22
 */
@Component
@Slf4j
@EnableBinding(KafkaOutputChannel.class)
public class KafkaMessageProducer {

    @Autowired
    private KafkaOutputChanel kafkaOutputChanel;

    public void sendMessage(){
        kafkaOutputChanel.outputChannel().send(MessageBuilder.withPayload(new Object()).build());
    }

}

六、消息消费者

管道类

/**
 * @Description kafka输入管道--消息消费者管道
 * @Author lcy
 * @Date 2021/1/12 10:20
 */
public interface KafkaInputChanel {

    @Input(KafkaTopic.TOPIC_NAME)
    SubscribableChannel inputChannel();
}

消费者类

/**
 * @Description 消息监听者--消费消息
 * @Author lcy
 * @Date 2021/1/12 10:22
 */
@Component
@Slf4j
@EnableBinding(KafkaInputChannel.class)
public class KafkaMessageListener {

    @StreamListener(KafkaTopic.TOPIC_NAME)
    public void listen(Message<?> message,Object messageObject){
        //消费消息内容
        //如果开启手动确认消息,对象不可能为空,则手动确认
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT,Acknowledgment.class);
        if (acknowledgment != null) {
            acknowledgment.acknowledge();
            log.info("consume kafka message successfully.topic= {} ,message = {} ",KafkaTopic.TOPIC_NAME,messageObject);
        }
    }

}