
在spring boot应用程序中,当一个api端点被调用时,通常期望在请求-响应周期内获得一个确定的结果。然而,当这个api端点触发或查询一个基于apache flink的流处理程序,并且该程序使用了“无界数据源”(unbounded data source)时,会遇到一个根本性的矛盾。无界数据源,顾名思义,是持续不断产生数据的,没有明确的结束点。这意味着flink作业会持续运行、持续处理数据并更新其内部聚合状态,但永远不会有一个“最终”的聚合结果。因此,在api请求的当下,无法从一个仍在运行的无界流作业中获取一个固定的、代表最终状态的聚合结果并将其作为http响应返回。
问题的核心在于:
为了解决这一矛盾,我们需要重新思考如何将Flink的流处理能力与Spring Boot的请求-响应模型结合起来。
最直接的解决方案是将原本的无界数据源在特定查询场景下转换为有界数据源。这意味着在API被调用时,我们指示Flink处理一个明确定义的数据范围,从而产生一个确定的、可返回的聚合结果。
对于像Apache Kafka这样的消息队列,我们可以通过指定起始和结束偏移量(offsets)来将其无界特性“截断”为一个有界的数据集。当Spring Boot API被调用时,它可以在内部构建一个Flink作业,该作业仅消费Kafka主题中特定范围的数据。
// 假设在Spring Boot中动态构建并提交Flink作业
public List<Tuple2<String, Long>> getAggregatedDataFromKafka(
String topic, long startOffset, long endOffset, int partition) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 生产环境建议使用RemoteEnvironment或Standalone模式
env.setRuntimeMode(RuntimeMode.BATCH); // 对于有界查询,建议设置为BATCH模式
// 构建Kafka源,指定起始和结束偏移量
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics(topic)
.setStartingOffsets(OffsetsInitializer.forSpecificOffsets(
new HashMap<TopicPartition, Long>() {{
put(new TopicPartition(topic, partition), startOffset);
}}
))
// 设置结束偏移量,将其变为有界源
.setBoundedStopOffsets(OffsetsInitializer.forSpecificOffsets(
new HashMap<TopicPartition, Long>() {{
put(new TopicPartition(topic, partition), endOffset);
}}
))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Bounded Source");
// 示例:简单词频统计
DataStream<Tuple2<String, Long>> aggregatedStream = kafkaStream
.flatMap((String value, Collector<Tuple2<String, Long>>) out -> {
for (String word : value.split(" ")) {
if (!word.isEmpty()) {
out.collect(new Tuple2<>(word, 1L));
}
}
})
.keyBy(0)
.sum(1);
// 将结果收集到List中 (适用于小规模数据,且会阻塞API调用)
List<Tuple2<String, Long>> results = new ArrayList<>();
try (CloseableIterator<Tuple2<String, Long>> it = aggregatedStream.executeAndCollect()) {
it.forEachRemaining(results::add);
}
return results;
}注意事项:
对于需要持续处理无界流并提供最新聚合结果的场景,最佳实践是让Flink作业将其聚合结果持续写入一个外部存储系统。Spring Boot应用程序的API则负责查询这个外部存储,而不是直接与运行中的Flink作业交互。
Flink作业(概念性代码):
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
public class FlinkRedisSinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // 启用检查点,每60秒一次
// 假设从Kafka读取数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("my-topic")
.setStartingOffsets(OffsetsInitializer.earliest()) // 无界源,从最早的可用偏移量开始
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 示例:统计每个单词的出现次数
DataStream<Tuple2<String, Long>> wordCounts = stream
.flatMap((String value, Collector<Tuple2<String, Long>>) out -> {
for (String word : value.split(" ")) {
if (!word.isEmpty()) {
out.collect(new Tuple2<>(word, 1L));
}
}
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口聚合
.sum(1);以上就是在Spring Boot中获取Flink聚合结果:无界数据源的挑战与策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号