
在spring kafka应用中,当使用批处理监听器处理消息时,如果消费者在反序列化阶段遇到瞬时错误(例如,avro schema注册中心连接问题),默认情况下这些deserializationexception会被视为致命错误,导致消息无法被重试,进而可能丢失或被跳过。为了增强应用的健壮性,我们需要一套机制来捕获这些异常并触发重试。
Spring Kafka的ErrorHandlingDeserializer在反序列化失败时,通常会返回null作为消息载荷,并将原始异常信息存储在消息头中。然而,默认的DefaultErrorHandler会将DeserializationException视为不可重试的异常类型。这意味着即使ErrorHandlingDeserializer捕获了异常,DefaultErrorHandler也不会触发消息的重试。对于批处理监听器,如果批次中的任何一条消息反序列化失败,整个批次都可能受到影响。
要实现反序列化异常的重试,我们需要分两步进行:
DefaultErrorHandler默认将DeserializationException标记为不可重试。我们需要通过调用removeClassification方法将其从致命异常列表中移除。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import java.util.List;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean("myContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> createFactory(
KafkaProperties properties
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties(),
new StringDeserializer(),
new ErrorHandlingDeserializer<>(new MyDeserializer()) // 使用自定义反序列化器
)
);
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE
);
// 创建并配置DefaultErrorHandler
DefaultErrorHandler errorHandler = new DefaultErrorHandler();
// 移除DeserializationException的致命标记,使其可以重试
errorHandler.removeClassification(DeserializationException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
// 模拟偶尔失败的反序列化器
static class MyDeserializer implements Deserializer<String> {
private int retries = 0;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No-op
}
@Override
public String deserialize(String topic, byte[] bytes) {
String s = new String(bytes);
// 模拟第一次遇到包含"7"的字符串时抛出异常,第二次成功
if (s.contains("7") && retries == 0) {
retries = 1; // 标记已尝试一次
System.out.println("Simulating deserialization error for: " + s);
throw new DeserializationException("Simulated deserialization failure for: " + s, bytes, false);
}
retries = 0; // 重置计数器
System.out.println("Deserialized successfully: " + s);
return s;
}
@Override
public void close() {
// No-op
}
}
}在上述配置中,我们创建了一个DefaultErrorHandler实例,并通过removeClassification(DeserializationException.class)方法明确指示Kafka,当遇到DeserializationException时,不应将其视为致命错误,而是应该尝试重试。
即使DefaultErrorHandler被配置为允许重试DeserializationException,ErrorHandlingDeserializer在反序列化失败时仍然会向监听器发送一个null载荷的消息。为了触发批次的重试,监听器需要检查这些null载荷,从消息头中提取原始的反序列化异常,并将其重新抛出。
批处理监听器需要接收List<Message<String>>而不是List<String>,以便能够访问消息头。
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.kafka.listener.ListenerUtils;
import java.util.List;
import java.util.Objects;
@Component
public class StringListener {
@KafkaListener(
topics = {"string-test"},
groupId = "test",
batch = "true",
containerFactory = "myContainerFactory"
)
public void listen(List<Message<String>> messages, Acknowledgment acknowledgment) {
boolean hasDeserializationError = false;
for (Message<String> message : messages) {
String payload = message.getPayload();
if (payload == null) {
// 载荷为null,检查是否是反序列化异常
byte[] exceptionHeader = (byte[]) message.getHeaders().get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
if (exceptionHeader != null) {
DeserializationException deserializationException =
ListenerUtils.byteArrayToDeserializationException(exceptionHeader);
if (deserializationException != null) {
System.err.println("Detected deserialization error for a message in batch: " + deserializationException.getMessage());
hasDeserializationError = true;
// 这里不直接抛出,而是标记,待循环结束后统一处理,确保检查完所有消息
}
}
} else {
System.out.println("Processed message: " + payload);
}
}
if (hasDeserializationError) {
// 如果批次中存在反序列化错误,则重新抛出异常,触发整个批次的重试
System.err.println("Batch contains deserialization errors. Re-throwing to trigger retry.");
// 抛出任意RuntimeException即可,DefaultErrorHandler会根据配置进行重试
throw new RuntimeException("Batch failed due to deserialization error(s).");
}
// 如果没有反序列化错误,或者所有错误都已处理且不需重试,则提交偏移量
acknowledgment.acknowledge();
System.out.println("Batch processed and acknowledged successfully.");
}
}在上述监听器代码中:
注意事项:
通过上述配置和代码实现,我们成功地为Kafka批处理监听器添加了反序列化异常的重试机制。这使得应用程序能够更优雅地处理瞬时的数据反序列化问题,避免了消息丢失,并提高了系统的容错能力。在实际应用中,应根据业务需求仔细调整DefaultErrorHandler的重试策略(如退避间隔、最大重试次数等),并确保消息处理的幂等性。
以上就是Kafka批处理监听器中反序列化异常的重试策略与实现的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号