0

0

Spring EmbeddedKafka 生产者等待消费者确认的实现方案

花韻仙語

花韻仙語

发布时间:2025-10-12 12:53:00

|

856人浏览过

|

来源于php中文网

原创

spring embeddedkafka 生产者等待消费者确认的实现方案

本文旨在解决 Spring EmbeddedKafka 测试场景下,生产者如何等待消费者确认消息的问题。由于 Kafka 生产者和消费者是独立的,`acks` 仅保证 Broker 接收并持久化消息,与消费者无关。因此,需要自定义逻辑实现生产者等待消费者确认的功能。本文将介绍实现此功能的思路和方法。

在 Spring EmbeddedKafka 环境下,确保生产者发送的消息被消费者正确处理并确认是一个常见的需求,尤其是在集成测试中。然而,Kafka 的设计本身是生产者和消费者解耦的,生产者端的 acks 配置仅仅控制 Broker 端的确认机制,并不能直接实现生产者等待消费者确认的功能。因此,我们需要引入额外的机制来实现这一目标。

核心思路:引入中间状态同步机制

由于生产者和消费者是独立的,我们需要一种方式让消费者在处理完消息后通知生产者。常见的做法是引入一个共享的状态存储,例如:

  • 共享的 ConcurrentHashMap: 适用于单 JVM 测试环境,简单高效。
  • Redis 或其他外部存储: 适用于分布式测试环境,更具扩展性。

实现步骤:

  1. 生产者端:

    • 发送消息前,生成一个唯一的 ID (例如 UUID)。
    • 将该 ID 作为消息头或者消息体的一部分发送给消费者。
    • 将该 ID 存储到一个等待确认的 Map 中,并设置一个超时时间。
    • 定期检查该 ID 是否从 Map 中移除,如果超时未移除,则认为消息处理失败。
  2. 消费者端:

    Quinvio AI
    Quinvio AI

    AI辅助下快速创建视频,虚拟代言人

    下载
    • 接收到消息后,提取消息中的 ID。
    • 处理完消息后,将该 ID 从共享的状态存储中移除。
    • 调用 Acknowledgement.acknowledge() 确认消息已被消费。

示例代码 (使用 ConcurrentHashMap):

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

@Component
public class MessageHandler {

    private final KafkaTemplate kafkaTemplate;
    private final ConcurrentHashMap processedMessages = new ConcurrentHashMap<>();

    public MessageHandler(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) throws Exception {
        String messageId = UUID.randomUUID().toString();
        processedMessages.put(messageId, false); // 添加到等待确认的 Map

        kafkaTemplate.send(topic, messageId, message).get(); // 确保消息发送成功

        // 等待消费者确认,设置超时时间
        waitForConfirmation(messageId, 5, TimeUnit.SECONDS);
    }

    public void consumeMessage(String messageId, String message, Acknowledgment acknowledgment) {
        try {
            // 处理消息...
            System.out.println("Consumed message: " + message);
            processedMessages.remove(messageId); // 移除ID,表示已确认
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理异常...
        }
    }

    private void waitForConfirmation(String messageId, long timeout, TimeUnit unit) throws Exception {
        long startTime = System.currentTimeMillis();
        while (processedMessages.containsKey(messageId)) {
            if (System.currentTimeMillis() - startTime > unit.toMillis(timeout)) {
                throw new Exception("Timeout waiting for message confirmation: " + messageId);
            }
            Thread.sleep(100); // 短暂休眠,避免 CPU 占用过高
        }
    }
}

配置 KafkaListener:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    private final MessageHandler messageHandler;

    public KafkaConsumer(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @KafkaListener(topics = "your_topic", groupId = "your_group_id")
    public void listen(String messageId, String message, Acknowledgment acknowledgment) {
        messageHandler.consumeMessage(messageId, message, acknowledgment);
    }
}

注意事项:

  • 超时机制: 必须设置合理的超时时间,避免无限期等待。
  • 异常处理: 在消费者端,需要妥善处理消息处理失败的情况,例如重新入队或者记录错误日志。
  • 消息 ID 的唯一性: 确保消息 ID 在整个系统中是唯一的。
  • 并发问题: 如果多个生产者同时发送消息,需要考虑并发问题,例如使用线程安全的 Map 或者分布式锁。

总结:

虽然 Kafka 本身不提供生产者等待消费者确认的机制,但我们可以通过引入中间状态同步机制来实现这一功能。上述示例代码提供了一种基于 ConcurrentHashMap 的实现方案,可以根据实际需求选择更合适的方案,例如使用 Redis 或者其他外部存储。 关键在于生产者和消费者之间建立一个共享的状态,用于同步消息的处理状态。 通过这种方式,可以有效地提高集成测试的可靠性,确保消息被正确处理。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

106

2025.08.06

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

327

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

233

2023.10.07

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

167

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

482

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

75

2025.09.05

菜鸟裹裹入口以及教程汇总
菜鸟裹裹入口以及教程汇总

本专题整合了菜鸟裹裹入口地址及教程分享,阅读专题下面的文章了解更多详细内容。

0

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.8万人学习

C# 教程
C# 教程

共94课时 | 7.3万人学习

Java 教程
Java 教程

共578课时 | 49.5万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号