0

0

Reactive Stream 中正确合并多个 Flux 数据流的实践方法

花韻仙語

花韻仙語

发布时间:2026-01-13 13:21:10

|

768人浏览过

|

来源于php中文网

原创

Reactive Stream 中正确合并多个 Flux 数据流的实践方法

reactor 中,`mergewith()` 不会原地修改 flux,而是返回新实例;需用 `flatmap` 或 `fold` 链式组合多个 flux,避免误用可变变量导致空流。

在使用 Project Reactor 构建响应式数据流时,一个常见误区是试图像传统集合操作一样“累加” Flux 实例。例如,以下代码看似合理,实则无效:

val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty()
for (id in ids) {
    val events: Flux = eventStore.readEvents(id)
    allEventFlux.mergeWith(events) // ❌ 无副作用!返回新 Flux,但被直接丢弃
}
// allEventFlux 仍为 Flux.empty() —— 始终为空

根本原因在于:mergeWith() 是纯函数式操作,它不修改原 Flux,而是返回一个合并后的新 Flux。而 Kotlin 中 val 声明的变量不可重赋值,即使使用 var,上述写法也未将返回值重新赋给 allEventFlux,因此逻辑完全失效。

✅ 正确做法有两种主流方案,均符合响应式编程不可变、声明式原则:

方案一:推荐 — 使用 flatMap(并发合并,语义清晰)

适用于 ID 列表已知、且 readEvents(id) 返回的 Flux 可并行处理的场景(如异步读取事件流):

val allEventFlux: Flux = Flux.fromIterable(repository.findIds())
    .map { it.ekycId }
    .flatMap { id -> eventStore.readEvents(id) } // ✅ 自动合并所有子流,扁平化为单一流

flatMap 会为每个 ID 订阅其对应的 Flux,并将所有事件按实际到达顺序(非严格 FIFO,但保证不丢失)合并到一个统一的流中,天然支持背压与异步调度。

Rationale
Rationale

Rationale 是一款可帮助企业主、经理和个人做出艰难的决定的AI工具

下载

方案二:按序合并 — 使用 fold + mergeWith

若需严格保持 ID 顺序、或要求各子流串行执行(如避免资源竞争),可用 fold 累积合并:

val ids = repository.findIds().map { it.ekycId }
val allEventFlux: Flux = ids.fold(Flux.empty()) { acc, id ->
    acc.mergeWith(eventStore.readEvents(id)) // ✅ 每次返回新合并流,作为下一轮 acc
}

⚠️ 注意:mergeWith 在此上下文中是惰性组合,最终订阅时才会触发所有子流;但 fold 本身不引入并发,各 readEvents(id) 仍可能并发执行(取决于 eventStore 实现)。如需强制串行,请改用 concatMap:

Flux.fromIterable(ids)
    .concatMap { eventStore.readEvents(it) } // ✅ 严格按 ID 顺序逐个订阅,前一个完成后再执行下一个

总结与最佳实践

  • ❌ 避免在循环中调用 mergeWith 却不接收返回值;
  • ✅ 优先使用 flatMap 实现高效、声明式的多流合并;
  • ✅ 若需顺序控制,选用 concatMap(串行)或 mergeWith + fold(惰性累积);
  • ? 所有操作符均返回新 Flux,响应式链必须“一气呵成”,中间结果务必参与后续链式调用或显式赋值。

通过理解 Reactor 的不可变性与组合性,可写出更健壮、可维护的响应式数据流逻辑。

相关专题

更多
php与html混编教程大全
php与html混编教程大全

本专题整合了php和html混编相关教程,阅读专题下面的文章了解更多详细内容。

0

2026.01.13

PHP 高性能
PHP 高性能

本专题整合了PHP高性能相关教程大全,阅读专题下面的文章了解更多详细内容。

1

2026.01.13

MySQL数据库报错常见问题及解决方法大全
MySQL数据库报错常见问题及解决方法大全

本专题整合了MySQL数据库报错常见问题及解决方法,阅读专题下面的文章了解更多详细内容。

4

2026.01.13

PHP 文件上传
PHP 文件上传

本专题整合了PHP实现文件上传相关教程,阅读专题下面的文章了解更多详细内容。

2

2026.01.13

PHP缓存策略教程大全
PHP缓存策略教程大全

本专题整合了PHP缓存相关教程,阅读专题下面的文章了解更多详细内容。

3

2026.01.13

jQuery 正则表达式相关教程
jQuery 正则表达式相关教程

本专题整合了jQuery正则表达式相关教程大全,阅读专题下面的文章了解更多详细内容。

1

2026.01.13

交互式图表和动态图表教程汇总
交互式图表和动态图表教程汇总

本专题整合了交互式图表和动态图表的相关内容,阅读专题下面的文章了解更多详细内容。

1

2026.01.13

nginx配置文件详细教程
nginx配置文件详细教程

本专题整合了nginx配置文件相关教程详细汇总,阅读专题下面的文章了解更多详细内容。

3

2026.01.13

nginx部署php项目教程汇总
nginx部署php项目教程汇总

本专题整合了nginx部署php项目教程汇总,阅读专题下面的文章了解更多详细内容。

5

2026.01.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.6万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号