
本文旨在解决 Spring EmbeddedKafka 测试场景下,生产者如何等待消费者确认消息的问题。由于 Kafka 生产者和消费者是独立的,`acks` 仅保证 Broker 接收并持久化消息,与消费者无关。因此,需要自定义逻辑实现生产者等待消费者确认的功能。本文将介绍实现此功能的思路和方法。
在 Spring EmbeddedKafka 环境下,确保生产者发送的消息被消费者正确处理并确认是一个常见的需求,尤其是在集成测试中。然而,Kafka 的设计本身是生产者和消费者解耦的,生产者端的 acks 配置仅仅控制 Broker 端的确认机制,并不能直接实现生产者等待消费者确认的功能。因此,我们需要引入额外的机制来实现这一目标。
核心思路:引入中间状态同步机制
由于生产者和消费者是独立的,我们需要一种方式让消费者在处理完消息后通知生产者。常见的做法是引入一个共享的状态存储,例如:
实现步骤:
生产者端:
消费者端:
示例代码 (使用 ConcurrentHashMap):
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
public class MessageHandler {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ConcurrentHashMap<String, Boolean> processedMessages = new ConcurrentHashMap<>();
public MessageHandler(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) throws Exception {
String messageId = UUID.randomUUID().toString();
processedMessages.put(messageId, false); // 添加到等待确认的 Map
kafkaTemplate.send(topic, messageId, message).get(); // 确保消息发送成功
// 等待消费者确认,设置超时时间
waitForConfirmation(messageId, 5, TimeUnit.SECONDS);
}
public void consumeMessage(String messageId, String message, Acknowledgment acknowledgment) {
try {
// 处理消息...
System.out.println("Consumed message: " + message);
processedMessages.remove(messageId); // 移除ID,表示已确认
acknowledgment.acknowledge();
} catch (Exception e) {
// 处理异常...
}
}
private void waitForConfirmation(String messageId, long timeout, TimeUnit unit) throws Exception {
long startTime = System.currentTimeMillis();
while (processedMessages.containsKey(messageId)) {
if (System.currentTimeMillis() - startTime > unit.toMillis(timeout)) {
throw new Exception("Timeout waiting for message confirmation: " + messageId);
}
Thread.sleep(100); // 短暂休眠,避免 CPU 占用过高
}
}
}配置 KafkaListener:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
private final MessageHandler messageHandler;
public KafkaConsumer(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@KafkaListener(topics = "your_topic", groupId = "your_group_id")
public void listen(String messageId, String message, Acknowledgment acknowledgment) {
messageHandler.consumeMessage(messageId, message, acknowledgment);
}
}注意事项:
总结:
虽然 Kafka 本身不提供生产者等待消费者确认的机制,但我们可以通过引入中间状态同步机制来实现这一功能。上述示例代码提供了一种基于 ConcurrentHashMap 的实现方案,可以根据实际需求选择更合适的方案,例如使用 Redis 或者其他外部存储。 关键在于生产者和消费者之间建立一个共享的状态,用于同步消息的处理状态。 通过这种方式,可以有效地提高集成测试的可靠性,确保消息被正确处理。
以上就是Spring EmbeddedKafka 生产者等待消费者确认的实现方案的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号