0

0

Reactor中非阻塞地聚合两个Flux结果为单个Mono的教程

DDD

DDD

发布时间:2025-12-03 17:10:52

|

577人浏览过

|

来源于php中文网

原创

reactor中非阻塞地聚合两个flux结果为单个mono的教程

本文将深入探讨在Project Reactor中如何高效、非阻塞地将两个独立的`Flux`流的聚合结果合并成一个`Mono`对象。通过详细分析传统阻塞方法的不足,并引入`zip`操作符,我们将演示如何利用`Mono.zipWith`将两个`Flux`转换为`Mono`,进而安全地组合这些列表,最终生成一个包含所有数据的`Mono`响应式对象,从而保持整个应用程序的响应式和非阻塞特性。

响应式编程框架Project Reactor中,我们经常会遇到需要从多个异步数据源获取数据,并将这些数据聚合成一个单一结果的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个统一的Payments对象中。本教程将指导您如何使用Reactor的zip操作符,以非阻塞的方式实现这一目标。

1. 理解问题:传统阻塞方法的陷阱

假设我们有以下领域模型,用于存储支付结果:

package org.example;

import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.util.List;

@Getter
@Builder
@ToString
public class Payments {

    private List successAccounts;
    private List failedAccounts;

    @Getter
    @Builder
    @ToString
    public static class SuccessAccount {
        private String name;
        private String accountNumber;
    }

    @Getter
    @Builder
    @ToString
    public static class FailedAccount {
        private String name;
        private String accountNumber;
        private String errorCode;
    }
}

我们有两个方法分别返回成功账户和失败账户的Flux流:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;

public class Main {

    public static Flux getAccountsSucceeded() {
        return Flux.just(Payments.SuccessAccount.builder()
                        .accountNumber("1234345")
                        .name("Payee1")
                        .build(),
                Payments.SuccessAccount.builder()
                        .accountNumber("83673674")
                        .name("Payee2")
                        .build());
    }

    public static Flux getAccountsFailed() {
        return Flux.just(Payments.FailedAccount.builder()
                        .accountNumber("12234345")
                        .name("Payee3")
                        .errorCode("8938")
                        .build(),
                Payments.FailedAccount.builder()
                        .accountNumber("3342343")
                        .name("Payee4")
                        .errorCode("8938")
                        .build());
    }
    // ... main method and getPaymentData will be added later
}

初学者可能会尝试通过收集每个Flux的列表,然后手动构建Payments对象。以下是一个常见的错误示例:

// 错误示例:阻塞式操作
public static Mono getPaymentDataIncorrect() {
    Flux accountsSucceeded = getAccountsSucceeded();
    Flux accountsFailed = getAccountsFailed();

    List successAccounts = new ArrayList<>();
    List failedAccounts = new ArrayList<>();

    // 这里的 subscribe() 调用是阻塞的,它会等待 Flux 完成并填充列表
    // 这破坏了响应式流的非阻塞特性,并且无法在响应式链中无缝衔接
    accountsFailed.collectList().subscribe(failedAccounts::addAll);
    accountsSucceeded.collectList().subscribe(successAccounts::addAll);

    return Mono.just(Payments.builder()
            .failedAccounts(failedAccounts)
            .successAccounts(successAccounts)
            .build());
}

上述代码中,collectList().subscribe()虽然可以获取到列表数据,但subscribe()方法本身是阻塞的,它会暂停当前线程直到Flux发出所有元素。这不仅违背了响应式编程的非阻塞原则,也使得getPaymentDataIncorrect方法在Mono.just()返回之前就已经同步地完成了数据收集,无法发挥响应式流的异步优势。在WebFlux等响应式框架中,这种阻塞操作会导致服务器线程池耗尽,严重影响系统吞吐量。

2. 响应式解决方案:zip操作符

Project Reactor提供了一系列强大的操作符来处理这种并行组合多个流的场景,其中zip操作符是解决此问题的理想选择。zip操作符会等待所有上游Publisher都发出一个元素,然后将这些元素通过一个提供的BiFunction(或Function)组合成一个新的元素。

对于将多个Flux的聚合结果合并成一个Mono,我们通常会遵循以下步骤:

Autoppt
Autoppt

Autoppt:打造高效与精美PPT的AI工具

下载

2.1 将 Flux 转换为 Mono

首先,我们需要将每个Flux流转换为一个Mono,该Mono在流完成后会发出一个包含所有元素的List。这可以通过collectList()操作符实现:

Mono> failedAccountsMono = accountsFailed.collectList();
Mono> successAccountsMono = accountsSucceeded.collectList();

collectList()操作符本身是响应式的,它会创建一个新的Mono。当原始Flux完成时,该Mono会发出一个包含所有收集到的元素的List。这个过程是非阻塞的,它将Flux的多个元素聚合为Mono的单个List元素。

2.2 使用 Mono.zipWith 组合 Mono

一旦我们有了两个Mono,我们就可以使用Mono.zipWith(或静态方法Mono.zip)来组合它们。zipWith方法接受另一个Mono作为参数,以及一个BiFunction来定义如何组合这两个Mono发出的结果:

public static Mono getPaymentData() {
    Flux accountsSucceeded = getAccountsSucceeded();
    Flux accountsFailed = getAccountsFailed();

    Mono> failedAccountsMono = accountsFailed.collectList();
    Mono> successAccountsMono = accountsSucceeded.collectList();

    Mono combinedPaymentsMono = failedAccountsMono.zipWith(
            successAccountsMono,
            (failedList, successList) -> Payments.builder()
                    .failedAccounts(failedList)
                    .successAccounts(successList)
                    .build()
    );
    return combinedPaymentsMono;
}

在这个zipWith调用中:

  • failedAccountsMono是源Mono,它会发出失败账户列表。
  • successAccountsMono是与之组合的Mono,它会发出成功账户列表。
  • BiFunction (failedList, successList) -> Payments.builder()... 定义了如何将这两个Mono发出的List作为参数,组合成最终的Payments对象。

整个操作链是完全非阻塞和响应式的。combinedPaymentsMono将会在两个源Mono都发出其List结果后,才通过BiFunction创建并发出最终的Payments对象。

3. 完整示例代码

为了更清晰地展示,以下是包含领域模型、数据源以及正确getPaymentData方法的完整代码示例:

package org.example;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

public class Main {
    public static void main(String[] args) {
        // 订阅并打印最终的 Payments 对象
        getPaymentData().subscribe(
            System.out::println, // onNext:

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

481

2023.08.10

function是什么
function是什么

function是函数的意思,是一段具有特定功能的可重复使用的代码块,是程序的基本组成单元之一,可以接受输入参数,执行特定的操作,并返回结果。本专题为大家提供function是什么的相关的文章、下载、课程内容,供大家免费下载体验。

478

2023.08.04

js函数function用法
js函数function用法

js函数function用法有:1、声明函数;2、调用函数;3、函数参数;4、函数返回值;5、匿名函数;6、函数作为参数;7、函数作用域;8、递归函数。本专题提供js函数function用法的相关文章内容,大家可以免费阅读。

163

2023.10.07

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

348

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2074

2023.08.14

vb怎么连接数据库
vb怎么连接数据库

在VB中,连接数据库通常使用ADO(ActiveX 数据对象)或 DAO(Data Access Objects)这两个技术来实现:1、引入ADO库;2、创建ADO连接对象;3、配置连接字符串;4、打开连接;5、执行SQL语句;6、处理查询结果;7、关闭连接即可。

347

2023.08.31

MySQL恢复数据库
MySQL恢复数据库

MySQL恢复数据库的方法有使用物理备份恢复、使用逻辑备份恢复、使用二进制日志恢复和使用数据库复制进行恢复等。本专题为大家提供MySQL数据库相关的文章、下载、课程内容,供大家免费下载体验。

255

2023.09.05

vb中怎么连接access数据库
vb中怎么连接access数据库

vb中连接access数据库的步骤包括引用必要的命名空间、创建连接字符串、创建连接对象、打开连接、执行SQL语句和关闭连接。本专题为大家提供连接access数据库相关的文章、下载、课程内容,供大家免费下载体验。

323

2023.10.09

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

71

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.8万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号