0

0

Mutiny响应式编程:异步处理Uni中元素的两种方法

花韻仙語

花韻仙語

发布时间:2025-09-27 11:14:39

|

766人浏览过

|

来源于php中文网

原创

Mutiny响应式编程:异步处理Uni中元素的两种方法

本文探讨了在Mutiny响应式编程中,如何高效地异步处理Uni类型中的每个元素。通过将Uni转换为Multi流,并利用transformToUniAndMerge操作符实现并发处理,我们展示了两种关键策略:一种是结合Vert.x Unit进行异步测试管理,另一种是使用collect().asList().await().indefinitely()阻塞并收集所有结果,从而避免程序过早终止的问题。

异步处理Uni的挑战

在mutiny中,当我们拥有一个uni>类型的响应式流,并希望对列表中的每一个t元素进行异步处理时,直接使用map操作将列表中的每个元素转换为新的uni,然后尝试通过flatmap合并这些uni,可能会遇到程序过早终止的问题。例如,原始尝试的代码片段:

Uni> listUni = Uni.createFrom().item(List.of("a", "b", "c"));

listUni.map(strings -> strings.stream().map(this::processItem).toList()) // 将List映射为List>
       .flatMap(unis -> Uni.join().all(unis).andCollectFailures()) // 合并所有Uni
       .subscribe()
       .with(System.out::println);

// 模拟异步处理的方法
private Uni processItem(String item) {
    // 假设这里是一个耗时的异步操作
    return Uni.createFrom().item(item)
               .onItem().delayIt().by(Duration.ofSeconds(1))
               .invoke(() -> System.out.println("Processing item: " + item))
               .replaceWithVoid();
}

这种方法的问题在于,subscribe().with()本身是异步的。如果主程序没有显式地等待这些异步操作完成,它可能会在所有Uni完成之前退出,导致看起来只有部分(甚至没有)元素被处理。为了确保所有异步操作都能执行完毕,我们需要一种机制来管理这些异步流的生命周期。

方案一:结合Multi和Vert.x Unit进行异步测试

在测试环境中,尤其是使用Vert.x Unit等框架时,我们可以利用其提供的异步上下文来管理响应式流的生命周期。核心思想是将Uni>转换为Multi,从而能够逐个处理列表中的元素,并使用transformToUniAndMerge实现并发处理。

1. 从Uni到Multi的转换

Uni>代表一个单一的、未来会产生一个列表的事件。要对列表中的每个元素进行流式处理,我们需要将其转换为Multi。onItem().transformToMulti(Multi.createFrom()::iterable)是实现这一转换的有效方式。

2. 使用transformToUniAndMerge实现并发

一旦拥有了Multi,我们可以使用onItem().transformToUniAndMerge()操作符。这个操作符的特点是,它会为Multi中的每个元素创建一个新的Uni,并并行地订阅这些Uni,然后将它们的发出项合并到一个新的Multi中。这正是我们实现异步并发处理所需要的。

3. Vert.x Unit Async 管理测试生命周期

在测试方法中,Vert.x Unit的TestContext提供了一个Async对象。通过在流的onTermination()回调中调用async.complete(),我们可以明确地告诉测试框架,当所有流操作完成(无论成功或失败)时,测试可以结束。这确保了主程序(或测试)会等待所有异步任务完成。

Artbreeder
Artbreeder

创建令人惊叹的插画和艺术

下载

示例代码

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {

    private final ExecutorService executor = Executors.newFixedThreadPool(3); // 用于模拟耗时操作的线程池

    // 模拟一个耗时的异步处理方法
    private Uni processItemAsync(String item) {
        Random random = new Random();
        int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
        System.out.println("Starting processing for '" + item + "' with duration " + duration + "ms on thread: " + Thread.currentThread().getName());

        // 使用Uni.createFrom().future来集成外部异步任务
        return Uni.createFrom().future(() -> executor.submit(() -> {
            try {
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Finished processing for '" + item + "' on thread: " + Thread.currentThread().getName());
            return item.toUpperCase(); // 返回处理后的结果
        }));
    }

    @Test
    public void testAsyncProcessingWithVertxUnit(VertxTestContext testContext) {
        // 创建一个Uni>
        Uni.createFrom().item(List.of("a", "b", "c", "d"))
                // 将Uni>转换为Multi,以便逐个处理元素
                .onItem().transformToMulti(Multi.createFrom()::iterable)
                // 对Multi中的每个元素应用异步转换,并合并结果
                .onItem().transformToUniAndMerge(this::processItemAsync)
                // 订阅并处理每个完成的项
                .onItem().invoke(s -> System.out.println("Received processed item: " + s))
                // 当流终止时(所有元素处理完毕或发生错误),通知测试上下文
                .onTermination().invoke((throwable, isCancelled) -> {
                    if (throwable != null) {
                        testContext.fail(throwable); // 如果有错误,测试失败
                    } else {
                        System.out.println("All items processed successfully.");
                        testContext.completeNow(); // 所有任务完成,测试成功
                    }
                    executor.shutdown(); // 关闭线程池
                })
                .subscribe()
                .with(
                        item -> System.out.println("Printing final result: " + item), // 实际的订阅者处理
                        testContext::fail // 错误处理
                );
    }
}

代码解释:

  • @ExtendWith(VertxExtension.class) 启用Vert.x JUnit 5扩展。
  • testContext.completeNow() 和 testContext.fail(throwable) 用于管理测试的异步完成状态。
  • processItemAsync 方法模拟了一个真实的异步操作,它返回一个Uni
  • onItem().transformToMulti(Multi.createFrom()::iterable) 将Uni>转换为Multi,使得每个字符串成为一个独立的流事件。
  • onItem().transformToUniAndMerge(this::processItemAsync) 是关键。它为Multi中的每个字符串创建一个新的Uni(通过processItemAsync),并并发地订阅这些Uni。所有这些Uni的结果会合并回一个新的Multi中,按它们完成的顺序发出。
  • onTermination().invoke(...) 确保无论流是成功完成还是因错误终止,testContext都能得到通知,从而正确结束测试。

方案二:收集结果并阻塞等待

在非测试场景或需要程序阻塞直到所有异步操作完成并收集结果时,我们可以使用collect().asList().await().indefinitely()。这种方法会等待所有并发处理的Uni完成,并将它们的输出收集到一个List中,然后阻塞当前线程直到这个List可用。

示例代码

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncListProcessingBlocking {

    private static final ExecutorService executor = Executors.newFixedThreadPool(3); // 用于模拟耗时操作的线程池

    // 模拟一个耗时的异步处理方法
    private Uni processItemAsync(String item) {
        Random random = new Random();
        int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
        System.out.println("Starting processing for '" + item + "' with duration " + duration + "ms on thread: " + Thread.currentThread().getName());

        return Uni.createFrom().future(() -> executor.submit(() -> {
            try {
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Finished processing for '" + item + "' on thread: " + Thread.currentThread().getName());
            return item.toUpperCase();
        }));
    }

    public static void main(String[] args) {
        AsyncListProcessingBlocking processor = new AsyncListProcessingBlocking();

        System.out.println("Starting main processing...");

        List results = Uni.createFrom().item(List.of("x", "y", "z", "w"))
                // 将Uni>转换为Multi
                .onItem().transformToMulti(Multi.createFrom()::iterable)
                // 对Multi中的每个元素应用异步转换,并合并结果
                .onItem().transformToUniAndMerge(processor::processItemAsync)
                // 打印每个完成的项(可选)
                .onItem().invoke(s -> System.out.println("Intermediate result received: " + s))
                // 收集所有结果到一个列表中,并阻塞当前线程直到所有结果可用
                .collect().asList()
                .await().indefinitely(); // 阻塞等待,直到所有结果收集完毕

        System.out.println("All items processed. Final collected results: " + results);
        executor.shutdown(); // 关闭线程池
        System.out.println("Main processing finished.");
    }
}

代码解释:

  • 与方案一类似,我们首先将Uni>转换为Multi,然后使用onItem().transformToUniAndMerge()进行并发处理。
  • collect().asList() 操作符会将Multi中发出的所有项收集到一个List中,并将其包装在一个Uni>中。
  • await().indefinitely() 是一个阻塞操作。它会阻塞当前线程,直到上游的Uni发出其项(即所有元素都被处理并收集到列表中)。一旦Uni发出项,await().indefinitely()就会返回这个项。
  • 这种方法适用于需要同步获取所有异步操作结果的场景,但请注意,它会阻塞调用线程。

核心概念与注意事项

  1. Uni与Multi的选择:
    • Uni代表一个单一的、异步产生的结果。
    • Multi代表一个可以产生零个、一个或多个结果的异步流。
    • 当需要对集合中的每个元素进行独立处理时,将Uni>转换为Multi是关键步骤。
  2. transformToUniAndMerge的并发特性:
    • 这个操作符是实现并发处理的核心。它会为每个流元素创建并订阅一个新的Uni,这些Uni会并行执行。
    • 结果的发出顺序可能与原始列表中的顺序不同,而是取决于各个Uni完成的速度。如果需要保持顺序,可以考虑使用transformToUniAndConcatenate或在收集结果后进行排序。
  3. 异步操作的终止管理:
    • 在Mutiny中,subscribe()是非阻塞的。如果主程序没有显式地等待异步流完成,它可能会在所有操作完成之前退出。
    • 在测试场景中,Vert.x Unit的TestContext和Async对象提供了一种优雅的方式来管理测试的生命周期。
    • 在应用程序代码中,如果需要等待所有结果,collect().asList().await().indefinitely()提供了一种阻塞等待的机制。
  4. 资源管理:
    • 如果processItemAsync内部使用了线程池(如ExecutorService),请确保在所有异步任务完成后关闭这些资源,以避免资源泄露。在示例中,我们都在流终止或主程序结束时调用了executor.shutdown()。

总结

异步处理Uni中的元素是响应式编程中的常见需求。通过将Uni>转换为Multi,并结合onItem().transformToUniAndMerge()操作符,我们可以高效地实现元素的并发处理。根据具体的应用场景,可以选择结合Vert.x Unit进行异步测试管理,或使用collect().asList().await().indefinitely()阻塞并收集所有结果。理解这些模式和Mutiny操作符的特性,对于构建健壮且高效的响应式应用程序至关重要。

相关文章

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
软件测试常用工具
软件测试常用工具

软件测试常用工具有Selenium、JUnit、Appium、JMeter、LoadRunner、Postman、TestNG、LoadUI、SoapUI、Cucumber和Robot Framework等等。测试人员可以根据具体的测试需求和技术栈选择适合的工具,提高测试效率和准确性 。

436

2023.10.13

java测试工具有哪些
java测试工具有哪些

java测试工具有JUnit、TestNG、Mockito、Selenium、Apache JMeter和Cucumber。php还给大家带来了java有关的教程,欢迎大家前来学习阅读,希望对大家能有所帮助。

296

2023.10.23

Java 单元测试
Java 单元测试

本专题聚焦 Java 在软件测试与持续集成流程中的实战应用,系统讲解 JUnit 单元测试框架、Mock 数据、集成测试、代码覆盖率分析、Maven 测试配置、CI/CD 流水线搭建(Jenkins、GitHub Actions)等关键内容。通过实战案例(如企业级项目自动化测试、持续交付流程搭建),帮助学习者掌握 Java 项目质量保障与自动化交付的完整体系。

19

2025.10.24

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

315

2023.08.02

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

254

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

206

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1463

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

617

2023.11.24

Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

36

2026.01.14

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.5万人学习

C# 教程
C# 教程

共94课时 | 6.7万人学习

Java 教程
Java 教程

共578课时 | 46万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号