首页 > Java > java教程 > 正文

Reactor中非阻塞地聚合两个Flux结果为单个Mono的教程

DDD
发布: 2025-12-03 17:10:52
原创
486人浏览过

reactor中非阻塞地聚合两个flux结果为单个mono的教程

本文将深入探讨在Project Reactor中如何高效、非阻塞地将两个独立的`Flux`流的聚合结果合并成一个`Mono`对象。通过详细分析传统阻塞方法的不足,并引入`zip`操作符,我们将演示如何利用`Mono.zipWith`将两个`Flux`转换为`Mono`,进而安全地组合这些列表,最终生成一个包含所有数据的`Mono`响应式对象,从而保持整个应用程序的响应式和非阻塞特性。

响应式编程框架Project Reactor中,我们经常会遇到需要从多个异步数据源获取数据,并将这些数据聚合成一个单一结果的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个统一的Payments对象中。本教程将指导您如何使用Reactor的zip操作符,以非阻塞的方式实现这一目标。

1. 理解问题:传统阻塞方法的陷阱

假设我们有以下领域模型,用于存储支付结果:

package org.example;

import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.util.List;

@Getter
@Builder
@ToString
public class Payments {

    private List<SuccessAccount> successAccounts;
    private List<FailedAccount> failedAccounts;

    @Getter
    @Builder
    @ToString
    public static class SuccessAccount {
        private String name;
        private String accountNumber;
    }

    @Getter
    @Builder
    @ToString
    public static class FailedAccount {
        private String name;
        private String accountNumber;
        private String errorCode;
    }
}
登录后复制

我们有两个方法分别返回成功账户和失败账户的Flux流:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;

public class Main {

    public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
        return Flux.just(Payments.SuccessAccount.builder()
                        .accountNumber("1234345")
                        .name("Payee1")
                        .build(),
                Payments.SuccessAccount.builder()
                        .accountNumber("83673674")
                        .name("Payee2")
                        .build());
    }

    public static Flux<Payments.FailedAccount> getAccountsFailed() {
        return Flux.just(Payments.FailedAccount.builder()
                        .accountNumber("12234345")
                        .name("Payee3")
                        .errorCode("8938")
                        .build(),
                Payments.FailedAccount.builder()
                        .accountNumber("3342343")
                        .name("Payee4")
                        .errorCode("8938")
                        .build());
    }
    // ... main method and getPaymentData will be added later
}
登录后复制

初学者可能会尝试通过收集每个Flux的列表,然后手动构建Payments对象。以下是一个常见的错误示例:

// 错误示例:阻塞式操作
public static Mono<Payments> getPaymentDataIncorrect() {
    Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
    Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();

    List<Payments.SuccessAccount> successAccounts = new ArrayList<>();
    List<Payments.FailedAccount> failedAccounts = new ArrayList<>();

    // 这里的 subscribe() 调用是阻塞的,它会等待 Flux 完成并填充列表
    // 这破坏了响应式流的非阻塞特性,并且无法在响应式链中无缝衔接
    accountsFailed.collectList().subscribe(failedAccounts::addAll);
    accountsSucceeded.collectList().subscribe(successAccounts::addAll);

    return Mono.just(Payments.builder()
            .failedAccounts(failedAccounts)
            .successAccounts(successAccounts)
            .build());
}
登录后复制

上述代码中,collectList().subscribe()虽然可以获取到列表数据,但subscribe()方法本身是阻塞的,它会暂停当前线程直到Flux发出所有元素。这不仅违背了响应式编程的非阻塞原则,也使得getPaymentDataIncorrect方法在Mono.just()返回之前就已经同步地完成了数据收集,无法发挥响应式流的异步优势。在WebFlux等响应式框架中,这种阻塞操作会导致服务器线程池耗尽,严重影响系统吞吐量。

2. 响应式解决方案:zip操作符

Project Reactor提供了一系列强大的操作符来处理这种并行组合多个流的场景,其中zip操作符是解决此问题的理想选择。zip操作符会等待所有上游Publisher都发出一个元素,然后将这些元素通过一个提供的BiFunction(或Function)组合成一个新的元素。

对于将多个Flux的聚合结果合并成一个Mono,我们通常会遵循以下步骤:

课游记AI
课游记AI

AI原生学习产品

课游记AI 86
查看详情 课游记AI

2.1 将 Flux 转换为 Mono<List>

首先,我们需要将每个Flux流转换为一个Mono,该Mono在流完成后会发出一个包含所有元素的List。这可以通过collectList()操作符实现:

Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailed.collectList();
Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceeded.collectList();
登录后复制

collectList()操作符本身是响应式的,它会创建一个新的Mono。当原始Flux完成时,该Mono会发出一个包含所有收集到的元素的List。这个过程是非阻塞的,它将Flux的多个元素聚合为Mono的单个List元素。

2.2 使用 Mono.zipWith 组合 Mono

一旦我们有了两个Mono<List>,我们就可以使用Mono.zipWith(或静态方法Mono.zip)来组合它们。zipWith方法接受另一个Mono作为参数,以及一个BiFunction来定义如何组合这两个Mono发出的结果:

public static Mono<Payments> getPaymentData() {
    Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
    Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();

    Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailed.collectList();
    Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceeded.collectList();

    Mono<Payments> combinedPaymentsMono = failedAccountsMono.zipWith(
            successAccountsMono,
            (failedList, successList) -> Payments.builder()
                    .failedAccounts(failedList)
                    .successAccounts(successList)
                    .build()
    );
    return combinedPaymentsMono;
}
登录后复制

在这个zipWith调用中:

  • failedAccountsMono是源Mono,它会发出失败账户列表。
  • successAccountsMono是与之组合的Mono,它会发出成功账户列表。
  • BiFunction (failedList, successList) -> Payments.builder()... 定义了如何将这两个Mono发出的List作为参数,组合成最终的Payments对象。

整个操作链是完全非阻塞和响应式的。combinedPaymentsMono将会在两个源Mono都发出其List结果后,才通过BiFunction创建并发出最终的Payments对象。

3. 完整示例代码

为了更清晰地展示,以下是包含领域模型、数据源以及正确getPaymentData方法的完整代码示例:

package org.example;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

public class Main {
    public static void main(String[] args) {
        // 订阅并打印最终的 Payments 对象
        getPaymentData().subscribe(
            System.out::println, // onNext:
登录后复制

以上就是Reactor中非阻塞地聚合两个Flux结果为单个Mono的教程的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号