kafka消费重试

2021-10-17   


kafka消费失败重试

前言

  借助于spring boot,对于kafka消息的生成和消费变得十分简单。之前的业务代码中,当消费者消费消息发生异常时仅做了异常的日志记载并手动ack了消息。kafka是否提供了类似RabbitMQ的死信队列模式呢?

简介

  通过配置SeekToCurrentErrorHandler的DeadLetterPublishingRecoverer与BackOff,可以手动控制在消费消息重试次数达到最大配置次数后,将消息ACK并投入到死信队列。

demo

config

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${kafka.consumer.retry.interval:10000}")
    private long interval;

    @Value("${kafka.consumer.retry.maxAttempts:3}")
    private long maxAttempts;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory, DefaultProducerListener defaultProducerListener) {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory);
        return kafkaTemplate;
    }

    @Bean
    public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
        //投递到死信队列 死信队列topic名称默认为原topicName+.DLT
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
        //设置重试间隔及次数
        BackOff backOff = new FixedBackOff(interval, maxAttempts);
        return new SeekToCurrentErrorHandler(recoverer, backOff);
    }

}
其他

  如果不想将消息投递到死信队列,可以将recoverer设置为null或者自定义BigConsumer接口的实现来执行达到最大重试次数后的处理逻辑。

Q.E.D.