
在响应式编程中,我们经常会遇到需要处理包含多个元素的异步操作。例如,有一个uni<list<string>>,我们希望对列表中的每个字符串都执行一个耗时的异步任务,并最终收集或处理所有任务的结果。
一个常见的尝试是使用map将List<String>转换为List<Uni<Void>>,然后通过Uni.join().all(unis).andCollectFailures()来合并这些Uni。然而,这种方法可能无法达到预期的并发处理效果,或者在短生命周期的程序(如单元测试)中,由于主线程过早退出,导致异步任务未能完成就被终止,从而给人一种“只处理了第一个元素”的错觉。
问题的核心在于,Uni<List<String>>本身代表的是一个单值流,其值是一个完整的列表。如果想对列表中的每个元素进行异步操作,并将其视为独立的响应式事件,就需要将这个列表“展开”成一个可以逐个处理的流。Mutiny提供了Multi类型来处理零到N个元素的流,这正是解决此类问题的关键。
Mutiny是Quarkus等框架中广泛使用的响应式编程库,它提供了两种核心类型:
要实现对Uni<List<String>>中每个元素的异步并发处理,我们需要将Uni<List<String>>首先转换为一个Multi<String>,这样列表中的每个字符串就成为了Multi流中的一个独立事件。然后,我们可以对这个Multi流中的每个事件应用异步转换。
在单元测试或需要非阻塞等待所有异步操作完成的场景中,我们可以利用Multi的特性和onTermination().invoke()回调来确保所有任务执行完毕。以下示例结合了Vert.x Unit,它提供了一个Async机制来管理异步测试的生命周期。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {
// 模拟一个异步操作,返回一个Uni
private Uni<String> processItemAsync(String item, Random random) {
final int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
System.out.println("Starting process for: " + item + ", duration: " + duration + "ms");
return Uni.createFrom().item(item)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Finished process for: " + item));
}
@Test
public void testAsyncProcessingWithVertxUnit(VertxTestContext context) {
Random random = new Random();
// Vert.x Unit的Async对象,用于通知测试框架异步操作何时完成
context.verify(() -> { // 确保在VertxTestContext的上下文中执行
Uni.createFrom()
.item(List.of("a", "b", "c")) // 初始的Uni<List<String>>
// 1. 将Uni<List<String>>转换为Multi<String>
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 2. 对Multi中的每个元素应用异步转换,并将结果合并回Multi
.onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
// 3. 订阅Multi流,处理每个完成的元素
.subscribe()
.with(
s -> System.out.println("Printing result: " + s), // 成功处理每个元素
context::failNow, // 任何错误导致流失败
context::completeNow // 流完成,通知VertxTestContext测试结束
);
});
}
}代码解释:
在某些场景下,例如在命令行工具或需要等待所有异步操作完成后才能继续主程序执行时,我们可以选择阻塞当前线程直到所有结果都被收集。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
public class BlockingAsyncListProcessing {
private static Uni<String> processItemAsync(String item, Random random) {
final int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
return Uni.createFrom().item(item)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Letter: " + item + ", duration in ms: " + duration));
}
public static void main(String[] args) {
Random random = new Random();
System.out.println("Starting blocking asynchronous processing...");
List<String> results = Uni.createFrom()
.item(List.of("a", "b", "c")) // 初始的Uni<List<String>>
// 1. 将Uni<List<String>>转换为Multi<String>
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 2. 对Multi中的每个元素应用异步转换,并将结果合并回Multi
.onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
// 3. 可选:处理每个完成的元素
.onItem().invoke(s -> System.out.println("Printing collected item: " + s))
// 4. 将Multi中的所有元素收集到一个列表中
.collect().asList()
// 5. 阻塞当前线程,直到Uni<List<String>>完成并返回结果
.await().indefinitely();
System.out.println("All items processed. Collected results: " + results);
}
}代码解释:
通过Mutiny的Multi类型和onItem().transformToUniAndMerge()操作符,我们可以有效地将Uni<List<T>>中的每个元素转换为独立的异步任务并进行并发处理。根据应用场景的不同,我们可以选择非阻塞的订阅模式(适用于响应式系统和测试)或阻塞式的await()模式(适用于需要同步等待结果的特定场景)。理解并正确运用这些Mutiny操作符是构建高效、健壮的响应式应用程序的关键。
以上就是Mutiny异步流处理:高效并发处理Uni中的元素的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号