
本文将深入探讨在Project Reactor中如何高效、非阻塞地将两个独立的`Flux`流的聚合结果合并成一个`Mono`对象。通过详细分析传统阻塞方法的不足,并引入`zip`操作符,我们将演示如何利用`Mono.zipWith`将两个`Flux`转换为`Mono`,进而安全地组合这些列表,最终生成一个包含所有数据的`Mono
在响应式编程框架Project Reactor中,我们经常会遇到需要从多个异步数据源获取数据,并将这些数据聚合成一个单一结果的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个统一的Payments对象中。本教程将指导您如何使用Reactor的zip操作符,以非阻塞的方式实现这一目标。
假设我们有以下领域模型,用于存储支付结果:
package org.example;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import java.util.List;
@Getter
@Builder
@ToString
public class Payments {
private List<SuccessAccount> successAccounts;
private List<FailedAccount> failedAccounts;
@Getter
@Builder
@ToString
public static class SuccessAccount {
private String name;
private String accountNumber;
}
@Getter
@Builder
@ToString
public static class FailedAccount {
private String name;
private String accountNumber;
private String errorCode;
}
}我们有两个方法分别返回成功账户和失败账户的Flux流:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
// ... main method and getPaymentData will be added later
}初学者可能会尝试通过收集每个Flux的列表,然后手动构建Payments对象。以下是一个常见的错误示例:
// 错误示例:阻塞式操作
public static Mono<Payments> getPaymentDataIncorrect() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
List<Payments.SuccessAccount> successAccounts = new ArrayList<>();
List<Payments.FailedAccount> failedAccounts = new ArrayList<>();
// 这里的 subscribe() 调用是阻塞的,它会等待 Flux 完成并填充列表
// 这破坏了响应式流的非阻塞特性,并且无法在响应式链中无缝衔接
accountsFailed.collectList().subscribe(failedAccounts::addAll);
accountsSucceeded.collectList().subscribe(successAccounts::addAll);
return Mono.just(Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build());
}上述代码中,collectList().subscribe()虽然可以获取到列表数据,但subscribe()方法本身是阻塞的,它会暂停当前线程直到Flux发出所有元素。这不仅违背了响应式编程的非阻塞原则,也使得getPaymentDataIncorrect方法在Mono.just()返回之前就已经同步地完成了数据收集,无法发挥响应式流的异步优势。在WebFlux等响应式框架中,这种阻塞操作会导致服务器线程池耗尽,严重影响系统吞吐量。
Project Reactor提供了一系列强大的操作符来处理这种并行组合多个流的场景,其中zip操作符是解决此问题的理想选择。zip操作符会等待所有上游Publisher都发出一个元素,然后将这些元素通过一个提供的BiFunction(或Function)组合成一个新的元素。
对于将多个Flux的聚合结果合并成一个Mono,我们通常会遵循以下步骤:
首先,我们需要将每个Flux流转换为一个Mono,该Mono在流完成后会发出一个包含所有元素的List。这可以通过collectList()操作符实现:
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailed.collectList(); Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceeded.collectList();
collectList()操作符本身是响应式的,它会创建一个新的Mono。当原始Flux完成时,该Mono会发出一个包含所有收集到的元素的List。这个过程是非阻塞的,它将Flux的多个元素聚合为Mono的单个List元素。
一旦我们有了两个Mono<List>,我们就可以使用Mono.zipWith(或静态方法Mono.zip)来组合它们。zipWith方法接受另一个Mono作为参数,以及一个BiFunction来定义如何组合这两个Mono发出的结果:
public static Mono<Payments> getPaymentData() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailed.collectList();
Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceeded.collectList();
Mono<Payments> combinedPaymentsMono = failedAccountsMono.zipWith(
successAccountsMono,
(failedList, successList) -> Payments.builder()
.failedAccounts(failedList)
.successAccounts(successList)
.build()
);
return combinedPaymentsMono;
}在这个zipWith调用中:
整个操作链是完全非阻塞和响应式的。combinedPaymentsMono将会在两个源Mono都发出其List结果后,才通过BiFunction创建并发出最终的Payments对象。
为了更清晰地展示,以下是包含领域模型、数据源以及正确getPaymentData方法的完整代码示例:
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
public class Main {
public static void main(String[] args) {
// 订阅并打印最终的 Payments 对象
getPaymentData().subscribe(
System.out::println, // onNext:以上就是Reactor中非阻塞地聚合两个Flux结果为单个Mono的教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号