Spring Boot与Kafka深度整合指南(springboot与kafka版本)
1 项目构建
- 项目环境搭建
创建一个SpringBoot的项目, 添加如下依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 在application.yml文件中配置kafka
spring:
kafka:
bootstrap-servers: worker1:9092 # kafka集群地址,多个地址使用逗号分割
consumer:
group-id: order-group
- 编写简单生产消息的代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public void send(String topic,Object msg){
kafkaTemplate.send(topic,msg);
}
public void send(String topic,String key, Object msg){
kafkaTemplate.send(topic,key,msg);
}
}
- 编写消费代码
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"topic-order"},groupId="${spring.kafka.consumer.group-id}")
public void listen(String message) {
System.out.println("收到消息:"+message);
}
}
- 编写Controller进行测试
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Autowired
private KafkaProducer kafkaProducer;
@RequestMapping("/order/{id}")
public String addOrder(@PathVariable("id") String id) {
System.out.println("创建订单,id="+id);
kafkaProducer.send("topic-order",id);
return "order";
}
}
- 我们访问Controller, 在浏览器输入: http://localhost:8080/order/114
2 生产者
2.1 带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法
第一种写法:
public void send(String topic,Object msg){
kafkaTemplate.send(topic, msg).addCallback(new SuccessCallback<SendResult<String, Object>>() {
//成功回调
@Override
public void onSuccess(SendResult<String, Object> success) {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);
}
}, new FailureCallback() {
//失败回调
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败1:" + throwable.getMessage());
}
});
}
第二种写法:
public void send(String topic,Object msg){
kafkaTemplate.send("sb_topic", msg).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败2:"+throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
2.2 监听器
Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试。
ProducerListener监听器作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:
package org.springframework.kafka.support;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.lang.Nullable;
public interface ProducerListener<K, V> {
//该方法是在消息达到brocker,brocker应答之后才会触发
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
//当消息发送失败时触发
default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
}
}
下面我们来编写一个监听器
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener<String,Object> {
@Override
public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
log.info("监听器 onError 执行 ProducerRecord= {} , exception = {}", producerRecord, exception);
}
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("KafkaProducerListener 发送成功 "+producerRecord.toString());
}
}
接着在定义KafkaTemplate的时候需要添加ProducerListener
import com.fs.kafka.demo3.listener.KafkaProducerListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
/**
* kafka生产者配置类
*/
@Slf4j
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private KafkaProducerListener kafkaProducerListener;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
//反序列化,和生产者的序列化方式对应
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
return factory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
}
3 消费者
3.1 指定topic、partition、offset消费
前面我们在监听消费topic-order的时候,监听的是topic-order上所有的消息。如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic-order和topic2,监听topic-order的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
属性解释:
* id:消费者ID
* groupId:消费组ID(这个配置项是用来指定消费者组ID的,它使得多个消费者实例可以协同工作,共同消费主题中的消息,并实现负载均衡、容错性和扩展性等功能)
* topics:监听的topic,可监听多个
* topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听
**/
@KafkaListener(id = "consumer1",groupId = "${spring.kafka.consumer.group-id}",topicPartitions = {
@TopicPartition(topic = "topic-order", partitions = {"0"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void listen(String message) {
System.out.println("收到消息:"+message);
}
}
注意:
topics和topicPartitions不能同时使用
我们在消费者类中可以使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。
@KafkaListener(topics = {"topic-order"},groupId="${spring.kafka.consumer.group-id}")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("收到消息:"+record);
}
在 Kafka 消息传递过程中,消息包含了消息头(headers)*和*消息体(payload)。可以通过 @Header 注解来获取消息头,通过 @Payload 获取消息体。
@KafkaListener(topics = {"topic-order"},groupId="${spring.kafka.consumer.group-id}")
public void listen(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println("收到消息,消费主体:"+message+",topic:"+topic+",partition:"+partition+",offset:"+offset);
}
3.2 批量消费
请注意:当Kafka监听器被配置为批量消费模式时,它接收的消息格式将是一个包含多个ConsumerRecord 对象的列表。在这种配置下,监听器不再直接支持处理单个ConsumerRecord 对象的方法签名。
第一步: 设置application.yml开启批量消费即可
spring:
application:
name: kafka-demo1
kafka:
bootstrap-servers: worker1:9092 # kafka集群地址,多个地址使用逗号分割
consumer:
group-id: order-group
max-poll-records: 50 #一次拉取50条消息
listener:
type: batch #批量消费
第二步: 编写消费者代码
// 接收消息时用List来接收,监听代码如下
@KafkaListener(id = "c1",groupId = "${spring.kafka.consumer.group-id}", topics = "topic-order")
public void onMessage(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
3.3 异常处理
通过 ConsumerAwareListenerErrorHandler 异常处理器 ,我们可以处理consumer在消费时发生的异常。
具体步骤:
1. 新建一个
ConsumerAwareListenerErrorHandler 类型的异常处理方法。2. 用@Bean注入,BeanName默认就是方法名。
3. 然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面。
4. 当监听抛出异常的时候,则会自动调用异常处理器。
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
System.out.println("消费异常:" + message.getPayload());
return null;
}
};
}
}
在消费者的消费方法上添加@KafkaListener注解的errorHandler属性里面
// 接收消息时用List来接收,监听代码如下
@KafkaListener(id = "c1",groupId = "${spring.kafka.consumer.group-id}", topics = "topic-order",errorHandler = "consumerAwareErrorHandler")
public void onMessage(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
int a = 1/0;//模拟抛出异常
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
3.4 ACK 机制(手动提交偏移量)
Kafka 消费消息有自动提交和手动提交两种模式。自动提交是默认模式,但你可以通过手动控制 acknowledgement 来实现手动确认机制。
手动提交目前有两种模式
MANUAL :对性能要求高(推荐)
MANUAL_IMMEDIATE:对数据一致性要求高
第一步: 在application.properties关闭自动提交,设置为手动提交
# 关闭自动提交
spring.kafka.consumer.enable-auto-commit=false
# 手动ack模式
spring.kafka.listener.ack-mode=MANUAL
第二步: 消费消息的时候,给方法添加 Acknowledgment 参数签收消息,同时执行 acknowledge 方法
@KafkaListener(id = "c1",groupId = "${spring.kafka.consumer.group-id}", topics = "topic-order",errorHandler = "consumerAwareErrorHandler")
public void onMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
//提交offset
acknowledgment.acknowledge();
}
在这个示例中,ack.acknowledge() 用于手动提交偏移量,告诉 Kafka 该消息已经成功处理。这样可以防止消息在未成功处理时被自动确认。