0

0

Reactor中非阻塞地聚合多个Flux结果为单个Mono的策略

霞舞

霞舞

发布时间:2025-12-03 15:24:27

|

470人浏览过

|

来源于php中文网

原创

Reactor中非阻塞地聚合多个Flux结果为单个Mono的策略

本文旨在探讨如何在project reactor框架中,以非阻塞的方式将两个独立的`flux`数据流的聚合结果合并为一个单一的`mono`对象。通过分析传统阻塞方法的不足,文章将重点介绍`mono.zipwith`操作符及其与`flux.collectlist()`的结合使用,以构建一个完全响应式、高效且易于维护的数据聚合解决方案,并提供详细的代码示例和最佳实践建议。

引言:响应式数据流聚合的挑战

在现代异步编程中,尤其是在基于Project Reactor等响应式框架构建的系统中,我们经常面临需要从多个独立的异步源获取数据,并将这些数据聚合成一个单一的、结构化的复合对象的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个支付结果对象中。

这种聚合操作的关键在于保持整个处理流程的非阻塞性。如果在此过程中引入任何阻塞操作,将可能导致线程资源浪费、系统吞吐量下降,并违背响应式编程的核心理念。

领域模型概览

为了更好地理解问题和解决方案,我们首先定义一个示例领域模型Payments,它包含成功账户和失败账户的列表:

package org.example;

import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.util.List;

@Getter
@Builder
@ToString
public class Payments {
    private List successAccounts;
    private List failedAccounts;

    @Getter
    @Builder
    @ToString
    public static class SuccessAccount {
        private String name;
        private String accountNumber;
    }

    @Getter
    @Builder
    @ToString
    public static class FailedAccount {
        private String name;
        private String accountNumber;
        private String errorCode;
    }
}

我们的目标是获取两个独立的Flux流(一个产生SuccessAccount,另一个产生FailedAccount),然后将它们各自收集成列表,最终封装进一个Payments对象,并且整个过程是非阻塞的。

问题剖析:为何传统方法会阻塞

初学者在尝试聚合多个响应式流时,可能会不自觉地引入阻塞操作。考虑以下尝试将两个Flux收集为列表并构建Payments对象的代码:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        getPaymentData().subscribe(System.out::println);
    }

    public static Mono getPaymentData() {
        Flux accountsSucceeded = getAccountsSucceeded();
        Flux accountsFailed = getAccountsFailed();

        List successAccounts = new ArrayList<>();
        List failedAccounts = new ArrayList<>();

        // 这里的subscribe调用是问题所在
        accountsFailed.collectList().subscribe(failedAccounts::addAll); // 阻塞或导致竞态条件
        accountsSucceeded.collectList().subscribe(successAccounts::addAll); // 阻塞或导致竞态条件

        return Mono.just(Payments.builder()
                .failedAccounts(failedAccounts)
                .successAccounts(successAccounts)
                .build());
    }

    // ... getAccountsSucceeded() 和 getAccountsFailed() 方法省略 ...
}

上述代码段中,accountsFailed.collectList().subscribe(failedAccounts::addAll); 和 accountsSucceeded.collectList().subscribe(successAccounts::addAll); 是问题的根源。

  1. subscribe() 的作用: subscribe() 是一个终端操作,它会触发响应式流的执行。然而,它本身是异步的,意味着当subscribe()被调用时,流的元素并不会立即被收集到failedAccounts或successAccounts列表中。
  2. 打破响应式链: 在subscribe()调用之后,紧接着的return Mono.just(...)会立即执行。此时,failedAccounts和successAccounts列表很可能仍然是空的,因为它们的填充是在异步的subscribe回调中进行的。这导致Payments对象被构建时包含了空列表,或者如果流处理需要时间,则会因为尝试同步获取异步结果而导致阻塞(尽管此处代码本身不会阻塞主线程,但它无法正确获取异步结果)。
  3. 非响应式: 这种模式实际上将响应式流的异步结果拉回到命令式代码中处理,破坏了整个操作的响应式特性。为了正确等待结果,开发者可能会引入block()操作,从而彻底失去了非阻塞的优势。

解决方案:使用Mono.zipWith实现非阻塞聚合

Project Reactor提供了zip系列操作符来解决这种并发聚合问题。Mono.zipWith(或静态方法Mono.zip)是专门用于将两个或多个Mono的结果合并成一个新Mono的强大工具

ClipDrop
ClipDrop

Stability.AI出品的图片处理系列工具(背景移除、图片放大、打光)

下载

其核心思想是:当所有参与zip操作的源Mono都成功发出其元素时,zip操作符会收集这些元素,并将它们作为参数传递给一个提供的BiFunction(或Function),该函数负责将这些元素组合成一个新的结果,然后由zip返回的Mono发出这个新结果。

在我们的场景中,我们需要将两个Flux转换为Mono,然后对这两个Mono进行zip操作。Flux.collectList()操作符正是为此而生,它将一个Flux转换为一个Mono>。

以下是使用Mono.zipWith的正确实现:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        getPaymentData().subscribe(System.out::println);
    }

    public static Mono getPaymentData() {
        Flux accountsSucceededFlux = getAccountsSucceeded();
        Flux accountsFailedFlux = getAccountsFailed();

        // 1. 将Flux转换为Mono
        Mono> failedAccountsMono = accountsFailedFlux.collectList();
        Mono> successAccountsMono = accountsSucceededFlux.collectList();

        // 2. 使用zipWith合并两个Mono的结果
        return failedAccountsMono.zipWith(
                successAccountsMono,
                (failedAccounts, successAccounts) -> Payments.builder()
                        .failedAccounts(failedAccounts)
                        .successAccounts(successAccounts)
                        .build()
        );
    }

    // 模拟获取成功账户的Flux
    public static Flux getAccountsSucceeded() {
        return Flux.just(Payments.SuccessAccount.builder()
                        .accountNumber("1234345")
                        .name("Payee1")
                        .build(),
                Payments.SuccessAccount.builder()
                        .accountNumber("83673674")
                        .name("Payee2")
                        .build());
    }

    // 模拟获取失败账户的Flux
    public static Flux getAccountsFailed() {
        return Flux.just(Payments.FailedAccount.builder()
                        .accountNumber("12234345")
                        .name("Payee3")
                        .errorCode("8938")
                        .build(),
                Payments.FailedAccount.builder()
                        .accountNumber("3342343")
                        .name("Payee4")
                        .errorCode("8938")
                        .build());
    }
}

代码解析:

  1. failedAccountsFlux.collectList(): 这个操作符将Flux转换成一个Mono>。这意味着当原始的Flux发出所有元素并完成时,collectList()会收集这些元素到一个列表中,并将这个列表作为单一元素由返回的Mono发出。
  2. failedAccountsMono.zipWith(successAccountsMono, ...):
    • failedAccountsMono是第一个源Mono,它会发出失败账户列表。
    • successAccountsMono是第二个源Mono,它会发出成功账户列表。
    • BiFunction (failedAccounts, successAccounts) -> ... 是一个组合函数。当两个源Mono都成功发出它们的列表时,zipWith会调用这个函数,将两个列表作为参数传入。
    • 在这个函数内部,我们使用Payments.builder()来构建最终的Payments对象,将两个列表分别设置到failedAccounts和successAccounts字段中。
  3. 非阻塞性: 整个链条都是非阻塞的。zipWith会等待两个上游Mono都完成后才执行组合函数,并且它本身返回一个Mono,允许消费者在它发出结果时进行订阅和处理,而无需在中间环节阻塞。

总结与最佳实践

  1. 保持响应式链条完整: 在Reactor中,避免在中间操作中调用subscribe()。subscribe()应该作为整个响应式链的最后一个操作,用于触发执行并处理最终结果。在链条中间,应使用各种操作符(如map、flatMap、filter、zip等)来转换和组合流。
  2. 利用zip系列操作符: 当需要将多个独立的异步结果聚合成一个复合结果时,Mono.zip或Flux.zip是理想的选择。它们确保所有依赖的异步操作都完成后才进行组合,同时保持非阻塞。
  3. collectList()的重要性: Flux.collectList()是将Flux转换为Mono的关键操作。这在需要将一个元素流聚合成一个集合,并进一步参与Mono操作(如zipWith)时非常有用。
  4. 错误处理: zip操作符具有“快速失败”的特性。如果任何一个参与zip的源Mono发出错误,那么整个zip操作返回的Mono也会立即发出该错误,而不会等待其他源完成。在实际应用中,应考虑如何使用onErrorResume、doOnError等操作符进行错误处理。

通过遵循这些原则,开发者可以构建出高效、健壮且完全响应式的应用程序,充分利用Project Reactor带来的并发和非阻塞优势。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

481

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

481

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

75

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

36

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

59

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

37

2025.11.27

function是什么
function是什么

function是函数的意思,是一段具有特定功能的可重复使用的代码块,是程序的基本组成单元之一,可以接受输入参数,执行特定的操作,并返回结果。本专题为大家提供function是什么的相关的文章、下载、课程内容,供大家免费下载体验。

478

2023.08.04

js函数function用法
js函数function用法

js函数function用法有:1、声明函数;2、调用函数;3、函数参数;4、函数返回值;5、匿名函数;6、函数作为参数;7、函数作用域;8、递归函数。本专题提供js函数function用法的相关文章内容,大家可以免费阅读。

163

2023.10.07

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

72

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.8万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号