
本文将深入探讨在Project Reactor中如何高效、非阻塞地将两个独立的`Flux`流的聚合结果合并成一个`Mono`对象。通过详细分析传统阻塞方法的不足,并引入`zip`操作符,我们将演示如何利用`Mono.zipWith`将两个`Flux`转换为`Mono`,进而安全地组合这些列表,最终生成一个包含所有数据的`Mono
在响应式编程框架Project Reactor中,我们经常会遇到需要从多个异步数据源获取数据,并将这些数据聚合成一个单一结果的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个统一的Payments对象中。本教程将指导您如何使用Reactor的zip操作符,以非阻塞的方式实现这一目标。
1. 理解问题:传统阻塞方法的陷阱
假设我们有以下领域模型,用于存储支付结果:
package org.example;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import java.util.List;
@Getter
@Builder
@ToString
public class Payments {
private List successAccounts;
private List failedAccounts;
@Getter
@Builder
@ToString
public static class SuccessAccount {
private String name;
private String accountNumber;
}
@Getter
@Builder
@ToString
public static class FailedAccount {
private String name;
private String accountNumber;
private String errorCode;
}
} 我们有两个方法分别返回成功账户和失败账户的Flux流:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static Flux getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
public static Flux getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
// ... main method and getPaymentData will be added later
} 初学者可能会尝试通过收集每个Flux的列表,然后手动构建Payments对象。以下是一个常见的错误示例:
// 错误示例:阻塞式操作 public static MonogetPaymentDataIncorrect() { Flux accountsSucceeded = getAccountsSucceeded(); Flux accountsFailed = getAccountsFailed(); List successAccounts = new ArrayList<>(); List failedAccounts = new ArrayList<>(); // 这里的 subscribe() 调用是阻塞的,它会等待 Flux 完成并填充列表 // 这破坏了响应式流的非阻塞特性,并且无法在响应式链中无缝衔接 accountsFailed.collectList().subscribe(failedAccounts::addAll); accountsSucceeded.collectList().subscribe(successAccounts::addAll); return Mono.just(Payments.builder() .failedAccounts(failedAccounts) .successAccounts(successAccounts) .build()); }
上述代码中,collectList().subscribe()虽然可以获取到列表数据,但subscribe()方法本身是阻塞的,它会暂停当前线程直到Flux发出所有元素。这不仅违背了响应式编程的非阻塞原则,也使得getPaymentDataIncorrect方法在Mono.just()返回之前就已经同步地完成了数据收集,无法发挥响应式流的异步优势。在WebFlux等响应式框架中,这种阻塞操作会导致服务器线程池耗尽,严重影响系统吞吐量。
2. 响应式解决方案:zip操作符
Project Reactor提供了一系列强大的操作符来处理这种并行组合多个流的场景,其中zip操作符是解决此问题的理想选择。zip操作符会等待所有上游Publisher都发出一个元素,然后将这些元素通过一个提供的BiFunction(或Function)组合成一个新的元素。
对于将多个Flux的聚合结果合并成一个Mono,我们通常会遵循以下步骤:
2.1 将 Flux 转换为 Mono
首先,我们需要将每个Flux流转换为一个Mono,该Mono在流完成后会发出一个包含所有元素的List。这可以通过collectList()操作符实现:
Mono> failedAccountsMono = accountsFailed.collectList(); Mono
> successAccountsMono = accountsSucceeded.collectList();
collectList()操作符本身是响应式的,它会创建一个新的Mono。当原始Flux完成时,该Mono会发出一个包含所有收集到的元素的List。这个过程是非阻塞的,它将Flux的多个元素聚合为Mono的单个List元素。
2.2 使用 Mono.zipWith 组合 Mono
一旦我们有了两个Mono,我们就可以使用Mono.zipWith(或静态方法Mono.zip)来组合它们。zipWith方法接受另一个Mono作为参数,以及一个BiFunction来定义如何组合这两个Mono发出的结果:
public static MonogetPaymentData() { Flux accountsSucceeded = getAccountsSucceeded(); Flux accountsFailed = getAccountsFailed(); Mono > failedAccountsMono = accountsFailed.collectList(); Mono
> successAccountsMono = accountsSucceeded.collectList(); Mono
combinedPaymentsMono = failedAccountsMono.zipWith( successAccountsMono, (failedList, successList) -> Payments.builder() .failedAccounts(failedList) .successAccounts(successList) .build() ); return combinedPaymentsMono; }
在这个zipWith调用中:
- failedAccountsMono是源Mono,它会发出失败账户列表。
- successAccountsMono是与之组合的Mono,它会发出成功账户列表。
- BiFunction (failedList, successList) -> Payments.builder()... 定义了如何将这两个Mono发出的List作为参数,组合成最终的Payments对象。
整个操作链是完全非阻塞和响应式的。combinedPaymentsMono将会在两个源Mono都发出其List结果后,才通过BiFunction创建并发出最终的Payments对象。
3. 完整示例代码
为了更清晰地展示,以下是包含领域模型、数据源以及正确getPaymentData方法的完整代码示例:
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
public class Main {
public static void main(String[] args) {
// 订阅并打印最终的 Payments 对象
getPaymentData().subscribe(
System.out::println, // onNext:










