0

0

Kafka批处理监听器中反序列化异常的重试策略与实现

碧海醫心

碧海醫心

发布时间:2025-09-01 17:25:33

|

588人浏览过

|

来源于php中文网

原创

Kafka批处理监听器中反序列化异常的重试策略与实现

本文详细介绍了如何在Spring Kafka批处理监听器中有效处理并重试反序列化异常。通过修改DefaultErrorHandler以取消对DeserializationException的致命标记,并结合监听器内部对带有null载荷的消息进行异常信息提取和重新抛出,实现对整个批次消息的重试,从而提高Kafka应用的鲁棒性。

Kafka批处理监听器反序列化异常重试机制

在spring kafka应用中,当使用批处理监听器处理消息时,如果消费者在反序列化阶段遇到瞬时错误(例如,avro schema注册中心连接问题),默认情况下这些deserializationexception会被视为致命错误,导致消息无法被重试,进而可能丢失或被跳过。为了增强应用的健壮性,我们需要一套机制来捕获这些异常并触发重试。

默认行为与挑战

Spring Kafka的ErrorHandlingDeserializer在反序列化失败时,通常会返回null作为消息载荷,并将原始异常信息存储在消息头中。然而,默认的DefaultErrorHandler会将DeserializationException视为不可重试的异常类型。这意味着即使ErrorHandlingDeserializer捕获了异常,DefaultErrorHandler也不会触发消息的重试。对于批处理监听器,如果批次中的任何一条消息反序列化失败,整个批次都可能受到影响。

实现反序列化异常重试的步骤

要实现反序列化异常的重试,我们需要分两步进行:

  1. 修改错误处理器,允许DeserializationException重试。
  2. 在监听器中识别并重新抛出反序列化异常。

1. 配置DefaultErrorHandler以允许重试

DefaultErrorHandler默认将DeserializationException标记为不可重试。我们需要通过调用removeClassification方法将其从致命异常列表中移除。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.util.List;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Bean("myContainerFactory")
    public ConcurrentKafkaListenerContainerFactory createFactory(
            KafkaProperties properties
    ) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(
                new DefaultKafkaConsumerFactory<>(
                        properties.buildConsumerProperties(),
                        new StringDeserializer(),
                        new ErrorHandlingDeserializer<>(new MyDeserializer()) // 使用自定义反序列化器
                )
        );
        factory.getContainerProperties().setAckMode(
                ContainerProperties.AckMode.MANUAL_IMMEDIATE
        );

        // 创建并配置DefaultErrorHandler
        DefaultErrorHandler errorHandler = new DefaultErrorHandler();
        // 移除DeserializationException的致命标记,使其可以重试
        errorHandler.removeClassification(DeserializationException.class);
        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }

    // 模拟偶尔失败的反序列化器
    static class MyDeserializer implements Deserializer {
        private int retries = 0;

        @Override
        public void configure(Map configs, boolean isKey) {
            // No-op
        }

        @Override
        public String deserialize(String topic, byte[] bytes) {
            String s = new String(bytes);
            // 模拟第一次遇到包含"7"的字符串时抛出异常,第二次成功
            if (s.contains("7") && retries == 0) {
                retries = 1; // 标记已尝试一次
                System.out.println("Simulating deserialization error for: " + s);
                throw new DeserializationException("Simulated deserialization failure for: " + s, bytes, false);
            }
            retries = 0; // 重置计数器
            System.out.println("Deserialized successfully: " + s);
            return s;
        }

        @Override
        public void close() {
            // No-op
        }
    }
}

在上述配置中,我们创建了一个DefaultErrorHandler实例,并通过removeClassification(DeserializationException.class)方法明确指示Kafka,当遇到DeserializationException时,不应将其视为致命错误,而是应该尝试重试。

2. 在监听器中处理null载荷并重新抛出异常

即使DefaultErrorHandler被配置为允许重试DeserializationException,ErrorHandlingDeserializer在反序列化失败时仍然会向监听器发送一个null载荷的消息。为了触发批次的重试,监听器需要检查这些null载荷,从消息头中提取原始的反序列化异常,并将其重新抛出。

英特尔AI工具
英特尔AI工具

英特尔AI与机器学习解决方案

下载

批处理监听器需要接收List>而不是List,以便能够访问消息头。

import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.kafka.listener.ListenerUtils;

import java.util.List;
import java.util.Objects;

@Component
public class StringListener {

    @KafkaListener(
            topics = {"string-test"},
            groupId = "test",
            batch = "true",
            containerFactory = "myContainerFactory"
    )
    public void listen(List> messages, Acknowledgment acknowledgment) {
        boolean hasDeserializationError = false;

        for (Message message : messages) {
            String payload = message.getPayload();
            if (payload == null) {
                // 载荷为null,检查是否是反序列化异常
                byte[] exceptionHeader = (byte[]) message.getHeaders().get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
                if (exceptionHeader != null) {
                    DeserializationException deserializationException = 
                        ListenerUtils.byteArrayToDeserializationException(exceptionHeader);

                    if (deserializationException != null) {
                        System.err.println("Detected deserialization error for a message in batch: " + deserializationException.getMessage());
                        hasDeserializationError = true;
                        // 这里不直接抛出,而是标记,待循环结束后统一处理,确保检查完所有消息
                    }
                }
            } else {
                System.out.println("Processed message: " + payload);
            }
        }

        if (hasDeserializationError) {
            // 如果批次中存在反序列化错误,则重新抛出异常,触发整个批次的重试
            System.err.println("Batch contains deserialization errors. Re-throwing to trigger retry.");
            // 抛出任意RuntimeException即可,DefaultErrorHandler会根据配置进行重试
            throw new RuntimeException("Batch failed due to deserialization error(s).");
        }

        // 如果没有反序列化错误,或者所有错误都已处理且不需重试,则提交偏移量
        acknowledgment.acknowledge();
        System.out.println("Batch processed and acknowledged successfully.");
    }
}

在上述监听器代码中:

  • 我们接收List>以便访问消息头。
  • 遍历批次中的每条消息。如果payload为null,则尝试从SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER头中提取原始的DeserializationException。
  • 使用ListenerUtils.byteArrayToDeserializationException()辅助方法将字节数组转换回DeserializationException对象。
  • 如果检测到DeserializationException,则设置hasDeserializationError标志。
  • 在遍历完所有消息后,如果hasDeserializationError为true,则重新抛出一个RuntimeException。这个异常会被DefaultErrorHandler捕获,由于我们已将DeserializationException从致命列表中移除,DefaultErrorHandler会根据其配置的重试策略(例如,指数退避)来重试整个批次。

注意事项:

  • 批次重试的粒度: 重新抛出异常会导致整个批次的消息被重试,而不是仅仅重试失败的那一条消息。这意味着批次中已经成功处理的消息也会被重新处理。在设计业务逻辑时需要考虑幂等性。
  • Spring Kafka版本: 确保使用的Spring Kafka版本支持DefaultErrorHandler的removeClassification方法(通常在2.8.4及更高版本中可用)。对于批处理错误处理器的分类行为,较新的版本(如2.9.x及更高版本)对FallbackBatchErrorHandler的异常分类处理更为完善。
  • 异常类型: 即使是从消息头中提取的DeserializationException,最终在监听器中重新抛出的可以是任何RuntimeException。DefaultErrorHandler会根据其内部的异常分类规则来决定是否重试。

总结

通过上述配置和代码实现,我们成功地为Kafka批处理监听器添加了反序列化异常的重试机制。这使得应用程序能够更优雅地处理瞬时的数据反序列化问题,避免了消息丢失,并提高了系统的容错能力。在实际应用中,应根据业务需求仔细调整DefaultErrorHandler的重试策略(如退避间隔、最大重试次数等),并确保消息处理的幂等性。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

98

2025.08.06

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

166

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

199

2024.02.23

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

312

2023.08.02

c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

229

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

434

2024.03.01

class在c语言中的意思
class在c语言中的意思

在C语言中,"class" 是一个关键字,用于定义一个类。想了解更多class的相关内容,可以阅读本专题下面的文章。

456

2024.01.03

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

65

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.2万人学习

C# 教程
C# 教程

共94课时 | 5.7万人学习

Java 教程
Java 教程

共578课时 | 40.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号