0

0

Mutiny异步处理Uni中元素的最佳实践

心靈之曲

心靈之曲

发布时间:2025-09-27 12:48:01

|

170人浏览过

|

来源于php中文网

原创

mutiny异步处理uni中元素的最佳实践

响应式编程中,处理Uni>这类结构时,一个常见需求是将列表中的每个元素独立地进行异步操作。例如,从数据库批量查询得到一个ID列表,然后需要为每个ID调用一个外部服务。直接对Uni>进行map操作通常会将整个列表作为一个整体处理,而无法实现对列表内每个元素的并发异步处理。本文将深入探讨Mutiny提供的强大工具,帮助开发者优雅地实现这一目标,避免常见陷阱。

核心问题剖析

当面对一个Uni>并希望对列表中的每个T执行异步操作时,一个常见的误区是尝试直接通过map将List转换为List>,然后使用Uni.join().all(unis).andCollectFailures()来合并结果,最后通过subscribe()进行消费。这种方法在Mutiny的链式操作中是可行的,但如果后续没有适当的机制来保持主线程的活跃,例如在简单的main方法中,程序可能会在所有异步任务完成之前退出,导致部分或全部异步操作未能执行或其结果未被观察到。

要正确地将Uni>中的每个元素转换为一个独立的异步Uni并进行并发处理,我们需要利用Mutiny的流式处理能力,或者采用阻塞机制来等待所有操作完成。

方法一:利用Multi进行非阻塞流式处理

这种方法是Mutiny推荐的、更符合响应式编程范式的处理方式。它通过将包含列表的Uni转换为一个Multi流,然后对流中的每个元素进行异步转换和合并,实现并发处理。

原理介绍

Mutiny的Multi类型非常适合处理元素流。通过以下步骤,我们可以将Uni>转换为Multi,对流中的每个元素独立应用异步转换,并利用transformToUniAndMerge实现并发处理:

  1. Uni> 转换为 Multi: 使用onItem().transformToMulti(Multi.createFrom()::iterable)将包含列表的Uni转换为一个包含列表元素的Multi。
  2. 元素异步转换: 对每个Multi中的元素,使用onItem().transformToUniAndMerge(item -> Uni.createFrom().future(processFuture(item)))将其转换为一个代表异步操作的Uni。transformToUniAndMerge会自动处理这些Uni的并发执行和结果合并。
  3. 结果处理与流终止: transformToUniAndMerge会返回一个新的Multi,其元素是所有异步操作的结果。可以通过subscribe()消费这些结果。为了确保所有异步操作完成,特别是在非Web服务器等环境中,需要额外的机制来保持主线程运行。

代码示例

以下示例演示了如何使用线程池模拟异步操作,并结合Mutiny的Multi进行非阻塞流式处理。

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch; // 用于在main方法中等待所有异步任务完成

public class AsyncListProcessor {

    private final ExecutorService executor = Executors.newFixedThreadPool(3); // 示例用线程池,限制并发数

    // 模拟一个返回Future的耗时操作
    private Future processFuture(String s) {
        return executor.submit(() -> {
            System.out.println("开始处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(3000) + 1000); // 模拟1-4秒延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("处理中断", e);
            }
            System.out.println("结束处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            return s.toUpperCase(); // 假设处理后返回大写
        });
    }

    // 将Future封装成Uni
    private Uni processItemAsUni(String item) {
        return Uni.createFrom().future(processFuture(item));
    }

    public void processListReactive(List items, CountDownLatch latch) {
        System.out.println("\n--- 启动非阻塞流式处理 ---");
        Uni.createFrom()
            .item(items)
            // 将 Uni> 转换为 Multi
            .onItem().transformToMulti(Multi.createFrom()::iterable)
            // 对 Multi 中的每个元素进行异步处理,并合并结果
            .onItem().transformToUniAndMerge(this::processItemAsUni)
            // 订阅并打印每个完成的结果
            .subscribe()
            .with(
                s -> System.out.println("接收到结果 (Reactive): " + s),
                failure -> System.err.println("处理失败: " + failure.getMessage()),
                () -> {
                    System.out.println("所有非阻塞流式处理完成.");
                    latch.countDown(); // 通知主线程所有任务已完成
                }
            );
    }

    public static void main(String[] args) throws InterruptedException {
        AsyncListProcessor processor = new AsyncListProcessor();
        List data = List.of("apple", "banana", "cherry", "date", "elderberry");

        // 使用CountDownLatch等待所有异步任务完成
        CountDownLatch latch = new CountDownLatch(1);
        processor.processListReactive(data, latch);

        // 等待所有异步任务完成
        latch.await();
        System.out.println("主线程继续执行,所有异步任务已完成或失败。");
        processor.executor.shutdown(); // 关闭线程池
    }
}

注意事项

这种方式是非阻塞的,非常适合构建响应式应用程序。它允许任务并发执行,且不会阻塞主调用线程。在非Web服务器等环境中(如简单的main方法),为了确保程序不会在异步操作完成前退出,需要使用CountDownLatch、await()或其他同步机制来等待所有任务完成。在基于Mutiny的框架(如Quarkus)中,这些通常由框架的调度器和生命周期管理。

DeepL
DeepL

DeepL是一款强大的在线AI翻译工具,可以翻译31种不同语言的文本,并可以处理PDF、Word、PowerPoint等文档文件

下载

方法二:收集并等待所有结果 (阻塞式)

如果你的需求是等待所有异步操作完成,并将它们的结果收集到一个列表中,然后才能继续执行后续逻辑,那么可以使用阻塞式的方法。

原理介绍

这种方法同样利用Multi进行元素的异步转换,但在最后阶段,它会阻塞当前线程,直到所有异步操作完成并将结果聚合到一个列表中。

  1. Uni> 转换为 Multi: 同方法一。
  2. 元素异步转换: 同方法一,使用onItem().transformToUniAndMerge()。
  3. 收集并等待: 在transformToUniAndMerge返回的Multi上调用collect().asList()将其所有元素收集到一个Uni>中,然后使用await().indefinitely()阻塞当前线程,直到该Uni完成。

代码示例

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AsyncListProcessorBlocking {

    private final ExecutorService executor = Executors.newFixedThreadPool(3); // 示例用线程池

    // 模拟一个返回Future的耗时操作 (同上)
    private Future processFuture(String s) {
        return executor.submit(() -> {
            System.out.println("开始处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            try {
                Thread.sleep(new Random().nextInt(3000) + 1000); // 模拟1-4秒延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("处理中断", e);
            }
            System.out.println("结束处理 (Future): " + s + " on thread " + Thread.currentThread().getName());
            return s.toUpperCase();
        });
    }

    // 将Future封装成Uni
    private Uni processItemAsUni(String item) {
        return Uni.createFrom().future(processFuture(item));
    }

    public List processListBlocking(List items) {
        System.out.println("\n--- 启动阻塞式处理 ---");
        List results = Uni.createFrom()
            .item(items)
            .onItem().transformToMulti(Multi.createFrom()::iterable)
            .onItem().transformToUniAndMerge(this::processItemAsUni)
            .collect().asList() // 收集所有结果到一个 Uni>
            .await().indefinitely(); // 阻塞当前线程直到所有结果收集完毕

        System.out.println("--- 阻塞式处理完成 ---");
        return results;
    }

    public static void main(String[] args) {
        AsyncListProcessorBlocking processor = new AsyncListProcessorBlocking();
        List data = List.of("alpha", "beta", "gamma", "delta", "epsilon");

        List processedResults = processor.processListBlocking(data);
        System.out.println("所有处理结果 (阻塞式): " + processedResults);

        processor.executor.shutdown(); // 关闭线程池
    }
}

注意事项

await().indefinitely()会阻塞调用线程。虽然它能确保所有异步操作完成,但在响应式系统中应谨慎使用,因为它可能导致线程阻塞,降低系统的并发能力。它更适用于启动时的数据加载、测试场景或需要等待所有结果才能继续的特定批处理任务。在Web应用中,应避免在处理请求的线程中使用await(),以防阻塞请求处理。

总结与最佳实践

Mutiny提供了灵活且强大的机制来处理Uni>中的元素异步操作。选择哪种

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

480

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

480

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

74

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

28

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

59

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

35

2025.11.27

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

344

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2074

2023.08.14

Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

36

2026.01.14

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.6万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号