首页 > Java > java教程 > 正文

使用 Reactor Kafka 消费指定范围消息后停止 Consumer

花韻仙語
发布: 2025-09-13 22:39:01
原创
435人浏览过

使用 reactor kafka 消费指定范围消息后停止 consumer

本文介绍了如何使用 Reactor Kafka 从指定 Topic 的起始位置开始消费消息,直到达到该 Topic Partition 的最新 Offset,并在消费完成后优雅地停止 Consumer。通过结合 seekToBeginning、endOffsets 和 takeUntil 等 Reactor Kafka 的特性,可以实现精确的消息消费控制。

在某些场景下,我们需要消费 Kafka Topic 中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的起始位置消费到最新 Offset,然后停止 Consumer。

代码示例

import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.support.Acknowledgment;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;

public class KafkaConsumerExample {

    public Disposable consumeMessages(String topic, String groupId, String bootstrapServers) {
        TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition

        // 配置 Consumer 属性
        Map<String, Object> consumerProps = Map.of(
                "bootstrap.servers", bootstrapServers,
                "group.id", groupId,
                "key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
                "value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
                "auto.offset.reset", "earliest" // 从最早的 Offset 开始消费
        );

        // 创建 ReceiverOptions
        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
                .subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));

        // 创建 ReactiveKafkaConsumerTemplate
        ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer = new ReactiveKafkaConsumerTemplate<>(receiverOptions);

        // 消费消息并停止 Consumer
        return kafkaConsumer
                .receive()
                .flatMap(record -> {
                    // 获取当前 Partition 的最新 Offset
                    Mono<Map<TopicPartition, Long>> endOffsetsMono = kafkaConsumer.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition)));

                    return endOffsetsMono.map(topicPartitionToLastOffset -> {
                        long lastOffset = topicPartitionToLastOffset.get(topicPartition);
                        return new RecordWithLastOffset(record, lastOffset);
                    });
                })
                .takeUntil(recordWithLastOffset -> recordWithLastOffset.record.offset() >= (recordWithLastOffset.lastOffset - 1))
                .subscribe(recordWithLastOffset -> {
                    ReceiverRecord<String, String> record = recordWithLastOffset.record;
                    Acknowledgment acknowledgment = record.receiverOffset();

                    System.out.printf("Received message: topic-partition=%s offset=%d key=%s value=%s\n",
                            acknowledgment.topicPartition(),
                            acknowledgment.offset(),
                            record.key(),
                            record.value());

                    acknowledgment.acknowledge();
                });
    }

    private static class RecordWithLastOffset {
        private final ReceiverRecord<String, String> record;
        private final long lastOffset;

        public RecordWithLastOffset(ReceiverRecord<String, String> record, long lastOffset) {
            this.record = record;
            this.lastOffset = lastOffset;
        }
    }


    public static void main(String[] args) {
        String topic = "your-topic-name";
        String groupId = "your-group-id";
        String bootstrapServers = "localhost:9092";

        KafkaConsumerExample example = new KafkaConsumerExample();
        Disposable disposable = example.consumeMessages(topic, groupId, bootstrapServers);

        // 保持程序运行一段时间,以便消费消息
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 取消订阅,停止消费
        disposable.dispose();
    }
}
登录后复制

代码解释

稿定抠图
稿定抠图

AI自动消除图片背景

稿定抠图 30
查看详情 稿定抠图
  1. 配置 Consumer 属性: 设置 Kafka Consumer 的连接信息、序列化方式、GroupId 以及 Offset 重置策略。auto.offset.reset = earliest 确保从 Topic 的起始位置开始消费。
  2. 创建 ReceiverOptions: 使用配置的 Consumer 属性创建 ReceiverOptions,并通过 subscription 指定要消费的 Topic。addAssignListener 用于在 Partition 分配后,通过 seekToBeginning 将 Consumer 的 Offset 重置到起始位置。
  3. 创建 ReactiveKafkaConsumerTemplate: 使用 ReceiverOptions 创建 ReactiveKafkaConsumerTemplate,用于消费 Kafka 消息。
  4. 消费消息并停止 Consumer:
    • kafkaConsumer.receive(): 从 Kafka Topic 接收消息,返回一个 Flux<ReceiverRecord<String, String>>。
    • flatMap: 对于每个接收到的消息,使用kafkaConsumer.doOnConsumer来获取当前TopicPartition的最新Offset。doOnConsumer允许你访问底层的KafkaConsumer对象,从而可以调用consumer.endOffsets方法。
    • map: 将ReceiverRecord和获取到的最新Offset封装到一个自定义的RecordWithLastOffset对象中。
    • takeUntil: 使用 takeUntil 操作符,当消费到最新 Offset 的前一个位置时,停止消费。record.offset() >= (lastOffset - 1) 判断当前消息的 Offset 是否已经达到或超过了最新 Offset 的前一个位置。
    • subscribe: 订阅 Flux,处理接收到的消息。在 subscribe 方法中,可以执行消息处理逻辑,并使用 record.receiverOffset().acknowledge() 提交 Offset。
  5. 取消订阅: 使用 disposable.dispose() 取消订阅,停止 Consumer。

注意事项

  • 示例代码假设 Topic 只有一个 Partition。如果 Topic 有多个 Partition,需要根据实际情况进行调整。
  • endOffsets 方法返回的是一个 Map<TopicPartition, Long>,其中 Long 值是每个 Partition 的最新 Offset。
  • Offset 的提交方式有多种,示例代码中使用的是手动提交,即在 subscribe 方法中调用 record.receiverOffset().acknowledge() 提交 Offset。也可以使用自动提交,通过设置 Consumer 的 enable.auto.commit 属性来实现。
  • 在实际应用中,需要处理可能出现的异常情况,例如 Kafka 连接失败、消息处理失败等。

总结

通过结合 Reactor Kafka 的 seekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精确的消息消费控制,并在消费完成后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。

以上就是使用 Reactor Kafka 消费指定范围消息后停止 Consumer的详细内容,更多请关注php中文网其它相关文章!

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号