
在 reactor 中,`mergewith()` 不会原地修改 flux,而是返回新实例;需用 `flatmap` 或 `fold` 链式组合多个 flux,避免误用可变变量导致空流。
在使用 Project Reactor 构建响应式数据流时,一个常见误区是试图像传统集合操作一样“累加” Flux 实例。例如,以下代码看似合理,实则无效:
val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty()
for (id in ids) {
val events: Flux = eventStore.readEvents(id)
allEventFlux.mergeWith(events) // ❌ 无副作用!返回新 Flux,但被直接丢弃
}
// allEventFlux 仍为 Flux.empty() —— 始终为空 根本原因在于:mergeWith() 是纯函数式操作,它不修改原 Flux,而是返回一个合并后的新 Flux。而 Kotlin 中 val 声明的变量不可重赋值,即使使用 var,上述写法也未将返回值重新赋给 allEventFlux,因此逻辑完全失效。
✅ 正确做法有两种主流方案,均符合响应式编程不可变、声明式原则:
方案一:推荐 — 使用 flatMap(并发合并,语义清晰)
适用于 ID 列表已知、且 readEvents(id) 返回的 Flux 可并行处理的场景(如异步读取事件流):
val allEventFlux: Flux= Flux.fromIterable(repository.findIds()) .map { it.ekycId } .flatMap { id -> eventStore.readEvents(id) } // ✅ 自动合并所有子流,扁平化为单一流
flatMap 会为每个 ID 订阅其对应的 Flux
方案二:按序合并 — 使用 fold + mergeWith
若需严格保持 ID 顺序、或要求各子流串行执行(如避免资源竞争),可用 fold 累积合并:
val ids = repository.findIds().map { it.ekycId }
val allEventFlux: Flux = ids.fold(Flux.empty()) { acc, id ->
acc.mergeWith(eventStore.readEvents(id)) // ✅ 每次返回新合并流,作为下一轮 acc
} ⚠️ 注意:mergeWith 在此上下文中是惰性组合,最终订阅时才会触发所有子流;但 fold 本身不引入并发,各 readEvents(id) 仍可能并发执行(取决于 eventStore 实现)。如需强制串行,请改用 concatMap:
Flux.fromIterable(ids)
.concatMap { eventStore.readEvents(it) } // ✅ 严格按 ID 顺序逐个订阅,前一个完成后再执行下一个总结与最佳实践
- ❌ 避免在循环中调用 mergeWith 却不接收返回值;
- ✅ 优先使用 flatMap 实现高效、声明式的多流合并;
- ✅ 若需顺序控制,选用 concatMap(串行)或 mergeWith + fold(惰性累积);
- ? 所有操作符均返回新 Flux,响应式链必须“一气呵成”,中间结果务必参与后续链式调用或显式赋值。
通过理解 Reactor 的不可变性与组合性,可写出更健壮、可维护的响应式数据流逻辑。









