0

0

处理 Kafka 批量监听器反序列化错误的重试机制

聖光之護

聖光之護

发布时间:2025-09-01 18:05:02

|

207人浏览过

|

来源于php中文网

原创

处理 kafka 批量监听器反序列化错误的重试机制

本文将介绍如何在 Kafka 批量监听器中配置和实现反序列化错误的重试机制。如摘要所述,默认情况下,DeserializationException 被认为是致命错误,不会进行重试。但是,通过适当的配置和代码实现,我们可以改变这种行为,使 Kafka 能够在遇到反序列化错误时自动重试,从而提高系统的健壮性。

移除默认的致命异常

Spring Kafka 提供了 DefaultErrorHandler 类来处理 Kafka 监听器中的异常。默认情况下,DeserializationException 被包含在不会重试的异常列表中。要启用反序列化错误的重试,首先需要从这个列表中移除 DeserializationException。

@org.springframework.context.annotation.Configuration
@EnableKafka
public class Configuration {
    @Bean("myContainerFactory")
     public ConcurrentKafkaListenerContainerFactory createFactory(
             KafkaProperties properties
    ) {
        var factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(
                new DefaultKafkaConsumerFactory(
                        properties.buildConsumerProperties(),
                        new StringDeserializer(),
                        new ErrorHandlingDeserializer(new MyDeserializer())
                )
        );
        factory.getContainerProperties().setAckMode(
                ContainerProperties.AckMode.MANUAL_IMMEDIATE
        );
        DefaultErrorHandler errorHandler = new DefaultErrorHandler();
        errorHandler.removeClassification(DeserializationException.class);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }

    // this fakes occasional errors which succeed after a retry
    static class MyDeserializer implements Deserializer {
        int retries = 0;
        @Override
        public String deserialize(String topic, byte[] bytes) {
            String s = new String(bytes);
            if (s.contains("7") && retries == 0) {
                retries = 1;
                throw new RuntimeException();
            }
            retries = 0;
            return s;
        }
    }
}

在上面的代码中,我们创建了一个 DefaultErrorHandler 实例,并使用 removeClassification(DeserializationException.class) 方法从不会重试的异常列表中移除了 DeserializationException。然后,我们将这个配置好的 errorHandler 设置到 ConcurrentKafkaListenerContainerFactory 中。

批量监听器中的异常处理

对于批量监听器,当发生反序列化错误时,需要获取到具体的异常信息,并将其重新抛出,才能触发重试机制。可以通过以下两种方式获取异常信息:

  1. Consume List>: 如果监听器消费的是 List>,则可以通过 Message 对象的 header 获取异常信息。 使用 SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER 获取header中的异常信息。

  2. 使用 @Header: 可以在监听器方法中添加一个额外的参数,并使用 @Header 注解来获取异常信息。

   @Component
   public class StringListener {
       @KafkaListener(
               topics = {"string-test"},
               groupId = "test",
               batch = "true",
               containerFactory = "myContainerFactory"
       )
       public void listen(List> messages, Acknowledgment acknowledgment) {
           for (Message message: messages) {
               try {
                   String s = message.getPayload();
                   System.out.println(s);
               } catch (Exception e) {
                   // 获取反序列化异常
                   byte[] exceptionBytes = (byte[]) message.getHeaders().get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
                   DeserializationException deserializationException = byteArrayToDeserializationException(exceptionBytes);

                   // 重新抛出异常,触发重试
                   throw new ListenerExecutionFailedException("Deserialization failed", deserializationException);
               }
           }
           acknowledgment.acknowledge();
       }

       private DeserializationException byteArrayToDeserializationException(byte[] bytes) {
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois;
            try {
                ois = new ObjectInputStream(bais);
                return (DeserializationException) ois.readObject();
            } catch (IOException | ClassNotFoundException e) {
                throw new RuntimeException("Failed to deserialize exception from byte array", e);
            }
        }
   }

注意事项:

  • 如果使用@Header方法获取异常,需要确保header中存在SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER。
  • 需要手动将byte数组反序列化为DeserializationException对象。

总结

通过以上步骤,我们可以配置 Kafka 批量监听器,使其在遇到反序列化错误时自动重试。首先,需要从 DefaultErrorHandler 的默认致命异常列表中移除 DeserializationException。然后,在批量监听器中,需要捕获异常,获取异常信息,并将其重新抛出,才能触发重试机制。这种方法可以有效地处理间歇性的反序列化问题,提高 Kafka 消费的稳定性和可靠性。需要注意的是,可以配置重试次数和重试间隔,以避免无限重试。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

相关标签:

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

98

2025.08.06

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

166

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

199

2024.02.23

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

312

2023.08.02

class在c语言中的意思
class在c语言中的意思

在C语言中,"class" 是一个关键字,用于定义一个类。想了解更多class的相关内容,可以阅读本专题下面的文章。

456

2024.01.03

python中class的含义
python中class的含义

本专题整合了python中class的相关内容,阅读专题下面的文章了解更多详细内容。

6

2025.12.06

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

3

2025.12.31

php网站源码教程大全
php网站源码教程大全

本专题整合了php网站源码相关教程,阅读专题下面的文章了解更多详细内容。

1

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号