中的元素:从问题到解决方案
" />
在quarkus或类似的响应式编程环境中,uni<list<t>>表示一个最终会发出一个list<t>的异步操作。当我们需要对这个列表中的每个元素执行另一个异步操作时,常见的挑战是如何确保所有这些独立的异步操作都能被正确地触发、执行并等待其完成,而不会因为主线程的提前退出而导致部分操作未能完成。
初始尝试可能涉及将Uni<List<String>>通过map操作转换为List<Uni<Void>>,然后使用Uni.join().all(unis).andCollectFailures()来合并这些Uni。然而,如果后续直接使用.subscribe().with(System.out::println),由于subscribe是非阻塞的,程序可能会在所有异步任务完成之前就退出,从而给人一种只有第一个元素被处理的错觉。核心问题在于如何管理这些异步操作的生命周期,确保在所有任务真正完成之前,程序不会终止。
为了高效且可靠地处理Uni<List>中的每个元素,并确保所有异步操作都能在主程序退出前完成,我们可以将Uni<List>转换为Multi,从而将列表中的每个元素作为独立的流事件来处理。结合Vert.x Unit等测试框架的异步上下文,可以优雅地管理异步流的生命周期。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.time.Duration;
import java.util.List;
import java.util.Random;
@RunWith(VertxUnitRunner.class)
public class AsyncListProcessingExample {
@Test
public void testAsyncListProcessing(TestContext context) {
Random random = new Random();
// 用于在流终止时通知测试框架
Async async = context.async();
Uni.createFrom()
.item(List.of("a", "b", "c")) // 原始的Uni<List<String>>
// 将 Uni<List<String>> 转换为 Multi<String>,每个字符串作为独立的流事件
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 对 Multi 中的每个字符串元素进行异步处理
.onItem().transformToUniAndMerge(s ->
// 为每个元素创建一个 Uni,模拟异步操作(这里是随机延迟)
Uni.createFrom().item(s)
.onItem().delayIt().by(Duration.ofMillis((random.nextInt(5) + 1) * 1000))
)
// 在流终止时执行回调,无论成功或失败
.onTermination().invoke((throwable, aBoolean) -> {
if (throwable != null) {
context.fail(throwable); // 如果有异常,测试失败
} else {
async.complete(); // 所有异步操作完成,通知测试通过
}
})
// 订阅 Multi,处理每个完成的元素
.subscribe()
.with(s -> System.out.println("Printing: " + s));
}
}在某些场景下,我们可能需要等待所有异步操作完成后,才继续执行后续的同步代码,或者需要将所有异步处理的结果收集到一个列表中。Mutiny提供了阻塞式等待和收集结果的机制。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
public class BlockingListProcessingExample {
public static void main(String[] args) {
Random random = new Random();
List<String> results = Uni.createFrom()
.item(List.of("a", "b", "c")) // 原始的Uni<List<String>>
// 将 Uni<List<String>> 转换为 Multi<String>
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 对 Multi 中的每个字符串元素进行异步处理
.onItem().transformToUniAndMerge(s -> {
final int duration = (random.nextInt(5) + 1) * 1000;
// 为每个元素创建一个 Uni,模拟异步操作
return Uni.createFrom().item(s)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Letter: " + s + ", duration in ms: " + duration));
})
// 订阅 Multi,处理每个完成的元素
.onItem().invoke(s -> System.out.println("Printing: " + s))
// 收集所有结果到一个列表中
.collect().asList()
// 阻塞当前线程,直到所有结果都被收集完毕
.await().indefinitely();
System.out.println("All results collected: " + results);
}
}在Mutiny中异步处理Uni<List>中的元素,关键在于将Uni<List>有效地转换为Multi,以便对每个元素进行流式处理。
理解这两种模式及其适用场景,能够帮助开发者更灵活、高效地构建基于Mutiny的异步应用。
以上就是异步处理Uni中的元素:从问题到解决方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号