
本文介绍了如何使用 Reactor Kafka 从指定 Topic 的起始位置开始消费消息,直到达到该 Topic Partition 的最新 Offset,并在消费完成后优雅地停止 Consumer。通过结合 seekToBeginning、endOffsets 和 takeUntil 等 Reactor Kafka 的特性,可以实现精确的消息消费控制。
在某些场景下,我们需要消费 Kafka Topic 中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的起始位置消费到最新 Offset,然后停止 Consumer。
代码示例
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.support.Acknowledgment;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
public class KafkaConsumerExample {
public Disposable consumeMessages(String topic, String groupId, String bootstrapServers) {
TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition
// 配置 Consumer 属性
Map<String, Object> consumerProps = Map.of(
"bootstrap.servers", bootstrapServers,
"group.id", groupId,
"key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
"auto.offset.reset", "earliest" // 从最早的 Offset 开始消费
);
// 创建 ReceiverOptions
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));
// 创建 ReactiveKafkaConsumerTemplate
ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer = new ReactiveKafkaConsumerTemplate<>(receiverOptions);
// 消费消息并停止 Consumer
return kafkaConsumer
.receive()
.flatMap(record -> {
// 获取当前 Partition 的最新 Offset
Mono<Map<TopicPartition, Long>> endOffsetsMono = kafkaConsumer.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition)));
return endOffsetsMono.map(topicPartitionToLastOffset -> {
long lastOffset = topicPartitionToLastOffset.get(topicPartition);
return new RecordWithLastOffset(record, lastOffset);
});
})
.takeUntil(recordWithLastOffset -> recordWithLastOffset.record.offset() >= (recordWithLastOffset.lastOffset - 1))
.subscribe(recordWithLastOffset -> {
ReceiverRecord<String, String> record = recordWithLastOffset.record;
Acknowledgment acknowledgment = record.receiverOffset();
System.out.printf("Received message: topic-partition=%s offset=%d key=%s value=%s\n",
acknowledgment.topicPartition(),
acknowledgment.offset(),
record.key(),
record.value());
acknowledgment.acknowledge();
});
}
private static class RecordWithLastOffset {
private final ReceiverRecord<String, String> record;
private final long lastOffset;
public RecordWithLastOffset(ReceiverRecord<String, String> record, long lastOffset) {
this.record = record;
this.lastOffset = lastOffset;
}
}
public static void main(String[] args) {
String topic = "your-topic-name";
String groupId = "your-group-id";
String bootstrapServers = "localhost:9092";
KafkaConsumerExample example = new KafkaConsumerExample();
Disposable disposable = example.consumeMessages(topic, groupId, bootstrapServers);
// 保持程序运行一段时间,以便消费消息
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 取消订阅,停止消费
disposable.dispose();
}
}代码解释
注意事项
总结
通过结合 Reactor Kafka 的 seekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精确的消息消费控制,并在消费完成后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。
以上就是使用 Reactor Kafka 消费指定范围消息后停止 Consumer的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号