springboot接入kafka
这里使用的是spring-kafka
一、介绍
spring-kafka是基于spring封装的kafka工具包,相比cloud-stream显得更加的灵活,但是cloud-stream更加轻量级,可以和别的中间件无缝切换, spring-kafka可以很好的与业务直接进行相关的接口写入。相较于实时计算工具Spark Streaming、Flink等,kafka streams不适用于大型业务场景。
二、pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>三、生产者
3.1 配置文件
spring:
kafka:
#kafka服务端,多个用,分割
bootstrap-servers: 192.168.1.23:9092
producer:
# 消息重发的次数。
retries: 0
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all3.2 发送消息
这里要注意通过json的字符串之间交互,如果通过对象交换,序列化方式需要进行修改
public class KafkaProducer {
/** kafka */
private final KafkaTemplate<String,Object> kafkaTemplate;
private <T> void sendDbKafka(String topic,T message){
AtomicBoolean sendResult = new AtomicBoolean(true);
kafkaTemplate.send(topic,JSONUtil.toJsonStr(message))
.addCallback(success -> log.info("send kafka message success. topic = {} ",topic)
,failure -> {
log.error("send kafka message failed. topic = {} ",topic);
sendResult.set(false);
});
AssertUtil.isTrue(!sendResult.get(),"send kafka message error");
}
}四、消费者
4.1 配置文件
spring:
kafka:
#kafka服务端,多个用,分割
bootstrap-servers: 192.168.1.23:9092
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
#手工ack,调用ack后立刻提交offset
ack-mode: manual_immediate
#容器运行的线程数
concurrency: 4
#避免出现主题未创建报错
missing-topics-fatal: false4.2 消费消息
这里注意通过json反序列化回原本的对象,很好的与各个服务解耦
@Component
@Slf4j
public class KafkaMessageListener{
@KafkaListener(topics = KafkaTopic.TOPIC, groupId = "${spring.kafka.consumer.group-id}")
public void consumer(Acknowledgment acknowledgment,ConsumerRecord<String,String> consumerRecord){
log.info("consume kafka message. topic = {} ",consumerRecord.topic());
String value = consumerRecord.value();
if (StrUtil.isBlank(value)) {
log.error("kafka message empty. topic = {}",consumerRecord.topic());
return;
}
Object object = JSONUtil.toBean(value,Object.class);
}五、事务消息
Kafka 从 0.11 版本开始引入了事务支持
事务可以保证对多个分区写入操作的原子性
操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能
5.1 配置文件修改
spring:
kafka:
producer:
# 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
#retries: 1
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
#acks: all
#事务id
transaction-id-prefix: kafka-tran5.2 事务消息
/**
* 注解方式的事务
* @param i
*/
@GetMapping("/kafka/transaction1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage1(int i) {
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1 i="+i);
if (i == 0) {
throw new RuntimeException("fail");
}
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2 i="+i);
}
/**
* 声明式事务支持
* @param i
*/
@GetMapping("/kafka/transaction2")
public void sendMessage2(int i) {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:1 i="+i);
if(i==0)
{
throw new RuntimeException("input is error");
}
kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:2 i="+i);
return true;
}
});
}