
本文详解如何在 project reactor 中正确合并多个 flux 数据流,纠正 `mergewith` 误用导致数据丢失的问题,并提供基于 `flatmap` 和 `fold` 的两种可靠实现方案。
在使用 Project Reactor 进行响应式编程时,一个常见误区是将 Flux.mergeWith() 当作“就地合并”操作——实际上,它返回一个全新的 Flux 实例,而非修改原对象。因此,如下代码无法达到预期效果:
val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty()
for (id in ids) {
val events: Flux = eventStore.readEvents(id)
allEventFlux.mergeWith(events) // ❌ 错误!返回新 Flux,但未赋值,原 allEventFlux 仍为空
} 这段代码中,allEventFlux 始终保持为 Flux.empty(),因为每次调用 mergeWith 产生的新流都被直接丢弃。
✅ 推荐方案一:使用 flatMap(最简洁、高效且符合响应式语义)
当每个 ID 对应一个事件流(Flux
val allEvents: Flux= Flux.fromIterable(repository.findIds()) .map { it.ekycId } .flatMap { id -> eventStore.readEvents(id) } // ✅ 自动合并所有子流,支持背压与并发控制
flatMap 不仅语义清晰,还天然支持异步、背压和并发(默认 concurrency=256,可通过重载参数调整),是处理“一对多”响应式映射的标准方式。
✅ 推荐方案二:使用 fold + mergeWith(需严格顺序合并)
若业务要求严格按 ID 列表顺序串行合并各流(即前一个流完全完成后再订阅下一个),可借助 Kotlin 的 fold 累积构建:
val ids = repository.findIds().map { it.ekycId }
val allEvents: Flux = ids.fold(Flux.empty()) { acc, id ->
acc.mergeWith(eventStore.readEvents(id)) // ✅ 每次将新流合并进累积结果
} ⚠️ 注意:此方式本质是链式 mergeWith,最终生成一个 Flux.merge(flux1, flux2, ..., fluxN) 等效结构,但不保证并发执行,且大量 ID 可能导致栈深度增加;生产环境建议优先使用 flatMap,仅在强顺序依赖场景下选用 fold。
? 额外提醒:
- 避免在响应式链中混用阻塞式集合操作(如 for 循环 + 可变变量),这违背响应式编程原则;
- mergeWith 适用于已知少量固定流的合并;动态批量合并请交由 Flux.merge() 或更高阶操作符(如 flatMap/concatMap)处理;
- 如需去重、限流或错误隔离,可在 flatMap 内添加 .onErrorResume()、.distinct() 等操作符增强健壮性。
掌握 mergeWith 的不可变特性与 flatMap 的声明式合并能力,是写出高效、可维护响应式代码的关键一步。










