kafka消费失败重试
前言
借助于spring boot,对于kafka消息的生成和消费变得十分简单。之前的业务代码中,当消费者消费消息发生异常时仅做了异常的日志记载并手动ack了消息。kafka是否提供了类似RabbitMQ的死信队列模式呢?
简介
通过配置SeekToCurrentErrorHandler的DeadLetterPublishingRecoverer与BackOff,可以手动控制在消费消息重试次数达到最大配置次数后,将消息ACK并投入到死信队列。
demo
config
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${kafka.consumer.retry.interval:10000}")
private long interval;
@Value("${kafka.consumer.retry.maxAttempts:3}")
private long maxAttempts;
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory, DefaultProducerListener defaultProducerListener) {
KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
return kafkaTemplate;
}
@Bean
public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
//投递到死信队列 死信队列topic名称默认为原topicName+.DLT
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
//设置重试间隔及次数
BackOff backOff = new FixedBackOff(interval, maxAttempts);
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
}
其他
如果不想将消息投递到死信队列,可以将recoverer设置为null或者自定义BigConsumer接口的实现来执行达到最大重试次数后的处理逻辑。
Q.E.D.