
本文介绍了如何使用 Reactor Kafka 从指定 Topic 的起始位置开始消费消息,直到达到该 Topic Partition 的最新 Offset,并在消费完成后优雅地停止 Consumer。通过结合 seekToBeginning、endOffsets 和 takeUntil 等 Reactor Kafka 的特性,可以实现精确的消息消费控制。
在某些场景下,我们需要消费 Kafka Topic 中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的起始位置消费到最新 Offset,然后停止 Consumer。
代码示例
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.support.Acknowledgment;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
public class KafkaConsumerExample {
public Disposable consumeMessages(String topic, String groupId, String bootstrapServers) {
TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition
// 配置 Consumer 属性
Map consumerProps = Map.of(
"bootstrap.servers", bootstrapServers,
"group.id", groupId,
"key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
"auto.offset.reset", "earliest" // 从最早的 Offset 开始消费
);
// 创建 ReceiverOptions
ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));
// 创建 ReactiveKafkaConsumerTemplate
ReactiveKafkaConsumerTemplate kafkaConsumer = new ReactiveKafkaConsumerTemplate<>(receiverOptions);
// 消费消息并停止 Consumer
return kafkaConsumer
.receive()
.flatMap(record -> {
// 获取当前 Partition 的最新 Offset
Mono 代码解释
- 配置 Consumer 属性: 设置 Kafka Consumer 的连接信息、序列化方式、GroupId 以及 Offset 重置策略。auto.offset.reset = earliest 确保从 Topic 的起始位置开始消费。
- 创建 ReceiverOptions: 使用配置的 Consumer 属性创建 ReceiverOptions,并通过 subscription 指定要消费的 Topic。addAssignListener 用于在 Partition 分配后,通过 seekToBeginning 将 Consumer 的 Offset 重置到起始位置。
- 创建 ReactiveKafkaConsumerTemplate: 使用 ReceiverOptions 创建 ReactiveKafkaConsumerTemplate,用于消费 Kafka 消息。
-
消费消息并停止 Consumer:
- kafkaConsumer.receive(): 从 Kafka Topic 接收消息,返回一个 Flux
>。 - flatMap: 对于每个接收到的消息,使用kafkaConsumer.doOnConsumer来获取当前TopicPartition的最新Offset。doOnConsumer允许你访问底层的KafkaConsumer对象,从而可以调用consumer.endOffsets方法。
- map: 将ReceiverRecord和获取到的最新Offset封装到一个自定义的RecordWithLastOffset对象中。
- takeUntil: 使用 takeUntil 操作符,当消费到最新 Offset 的前一个位置时,停止消费。record.offset() >= (lastOffset - 1) 判断当前消息的 Offset 是否已经达到或超过了最新 Offset 的前一个位置。
- subscribe: 订阅 Flux,处理接收到的消息。在 subscribe 方法中,可以执行消息处理逻辑,并使用 record.receiverOffset().acknowledge() 提交 Offset。
- kafkaConsumer.receive(): 从 Kafka Topic 接收消息,返回一个 Flux
- 取消订阅: 使用 disposable.dispose() 取消订阅,停止 Consumer。
注意事项
- 示例代码假设 Topic 只有一个 Partition。如果 Topic 有多个 Partition,需要根据实际情况进行调整。
- endOffsets 方法返回的是一个 Map
,其中 Long 值是每个 Partition 的最新 Offset。 - Offset 的提交方式有多种,示例代码中使用的是手动提交,即在 subscribe 方法中调用 record.receiverOffset().acknowledge() 提交 Offset。也可以使用自动提交,通过设置 Consumer 的 enable.auto.commit 属性来实现。
- 在实际应用中,需要处理可能出现的异常情况,例如 Kafka 连接失败、消息处理失败等。
总结
通过结合 Reactor Kafka 的 seekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精确的消息消费控制,并在消费完成后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。











