
- 类型中的每个元素。通过将Uni
- @ExtendWith(VertxExtension.class) 启用Vert.x JUnit 5扩展。
- testContext.completeNow() 和 testContext.fail(throwable) 用于管理测试的异步完成状态。
- processItemAsync 方法模拟了一个真实的异步操作,它返回一个Uni
。 - onItem().transformToMulti(Multi.createFrom()::iterable) 将Uni
- >转换为Multi
,使得每个字符串成为一个独立的流事件。 - onItem().transformToUniAndMerge(this::processItemAsync) 是关键。它为Multi中的每个字符串创建一个新的Uni(通过processItemAsync),并并发地订阅这些Uni。所有这些Uni的结果会合并回一个新的Multi中,按它们完成的顺序发出。
- onTermination().invoke(...) 确保无论流是成功完成还是因错误终止,testContext都能得到通知,从而正确结束测试。
- 与方案一类似,我们首先将Uni
- >转换为Multi
,然后使用onItem().transformToUniAndMerge()进行并发处理。 - collect().asList() 操作符会将Multi中发出的所有项收集到一个List中,并将其包装在一个Uni
- >中。
- await().indefinitely() 是一个阻塞操作。它会阻塞当前线程,直到上游的Uni发出其项(即所有元素都被处理并收集到列表中)。一旦Uni发出项,await().indefinitely()就会返回这个项。
- 这种方法适用于需要同步获取所有异步操作结果的场景,但请注意,它会阻塞调用线程。
-
Uni与Multi的选择:
- Uni代表一个单一的、异步产生的结果。
- Multi代表一个可以产生零个、一个或多个结果的异步流。
- 当需要对集合中的每个元素进行独立处理时,将Uni
- >转换为Multi
是关键步骤。
-
transformToUniAndMerge的并发特性:
- 这个操作符是实现并发处理的核心。它会为每个流元素创建并订阅一个新的Uni,这些Uni会并行执行。
- 结果的发出顺序可能与原始列表中的顺序不同,而是取决于各个Uni完成的速度。如果需要保持顺序,可以考虑使用transformToUniAndConcatenate或在收集结果后进行排序。
-
异步操作的终止管理:
- 在Mutiny中,subscribe()是非阻塞的。如果主程序没有显式地等待异步流完成,它可能会在所有操作完成之前退出。
- 在测试场景中,Vert.x Unit的TestContext和Async对象提供了一种优雅的方式来管理测试的生命周期。
- 在应用程序代码中,如果需要等待所有结果,collect().asList().await().indefinitely()提供了一种阻塞等待的机制。
-
资源管理:
- 如果processItemAsync内部使用了线程池(如ExecutorService),请确保在所有异步任务完成后关闭这些资源,以避免资源泄露。在示例中,我们都在流终止或主程序结束时调用了executor.shutdown()。
- 转换为Multi流,并利用transformToUniAndMerge操作符实现并发处理,我们展示了两种关键策略:一种是结合Vert.x Unit进行异步测试管理,另一种是使用collect().asList().await().indefinitely()阻塞并收集所有结果,从而避免程序过早终止的问题。
异步处理Uni的挑战
在mutiny中,当我们拥有一个uni>类型的响应式流,并希望对列表中的每一个t元素进行异步处理时,直接使用map操作将列表中的每个元素转换为新的uni,然后尝试通过flatmap合并这些uni,可能会遇到程序过早终止的问题。例如,原始尝试的代码片段:
Uni> listUni = Uni.createFrom().item(List.of("a", "b", "c")); listUni.map(strings -> strings.stream().map(this::processItem).toList()) // 将List
映射为List > .flatMap(unis -> Uni.join().all(unis).andCollectFailures()) // 合并所有Uni .subscribe() .with(System.out::println); // 模拟异步处理的方法 private Uni processItem(String item) { // 假设这里是一个耗时的异步操作 return Uni.createFrom().item(item) .onItem().delayIt().by(Duration.ofSeconds(1)) .invoke(() -> System.out.println("Processing item: " + item)) .replaceWithVoid(); }
这种方法的问题在于,subscribe().with()本身是异步的。如果主程序没有显式地等待这些异步操作完成,它可能会在所有Uni完成之前退出,导致看起来只有部分(甚至没有)元素被处理。为了确保所有异步操作都能执行完毕,我们需要一种机制来管理这些异步流的生命周期。
方案一:结合Multi和Vert.x Unit进行异步测试
在测试环境中,尤其是使用Vert.x Unit等框架时,我们可以利用其提供的异步上下文来管理响应式流的生命周期。核心思想是将Uni>转换为Multi
1. 从Uni到Multi的转换
Uni>代表一个单一的、未来会产生一个列表的事件。要对列表中的每个元素进行流式处理,我们需要将其转换为Multi
2. 使用transformToUniAndMerge实现并发
一旦拥有了Multi
3. Vert.x Unit Async 管理测试生命周期
在测试方法中,Vert.x Unit的TestContext提供了一个Async对象。通过在流的onTermination()回调中调用async.complete(),我们可以明确地告诉测试框架,当所有流操作完成(无论成功或失败)时,测试可以结束。这确保了主程序(或测试)会等待所有异步任务完成。
示例代码
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {
private final ExecutorService executor = Executors.newFixedThreadPool(3); // 用于模拟耗时操作的线程池
// 模拟一个耗时的异步处理方法
private Uni processItemAsync(String item) {
Random random = new Random();
int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
System.out.println("Starting processing for '" + item + "' with duration " + duration + "ms on thread: " + Thread.currentThread().getName());
// 使用Uni.createFrom().future来集成外部异步任务
return Uni.createFrom().future(() -> executor.submit(() -> {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Finished processing for '" + item + "' on thread: " + Thread.currentThread().getName());
return item.toUpperCase(); // 返回处理后的结果
}));
}
@Test
public void testAsyncProcessingWithVertxUnit(VertxTestContext testContext) {
// 创建一个Uni>
Uni.createFrom().item(List.of("a", "b", "c", "d"))
// 将Uni>转换为Multi,以便逐个处理元素
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 对Multi中的每个元素应用异步转换,并合并结果
.onItem().transformToUniAndMerge(this::processItemAsync)
// 订阅并处理每个完成的项
.onItem().invoke(s -> System.out.println("Received processed item: " + s))
// 当流终止时(所有元素处理完毕或发生错误),通知测试上下文
.onTermination().invoke((throwable, isCancelled) -> {
if (throwable != null) {
testContext.fail(throwable); // 如果有错误,测试失败
} else {
System.out.println("All items processed successfully.");
testContext.completeNow(); // 所有任务完成,测试成功
}
executor.shutdown(); // 关闭线程池
})
.subscribe()
.with(
item -> System.out.println("Printing final result: " + item), // 实际的订阅者处理
testContext::fail // 错误处理
);
}
}
代码解释:
方案二:收集结果并阻塞等待
在非测试场景或需要程序阻塞直到所有异步操作完成并收集结果时,我们可以使用collect().asList().await().indefinitely()。这种方法会等待所有并发处理的Uni完成,并将它们的输出收集到一个List中,然后阻塞当前线程直到这个List可用。
示例代码
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncListProcessingBlocking {
private static final ExecutorService executor = Executors.newFixedThreadPool(3); // 用于模拟耗时操作的线程池
// 模拟一个耗时的异步处理方法
private Uni processItemAsync(String item) {
Random random = new Random();
int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
System.out.println("Starting processing for '" + item + "' with duration " + duration + "ms on thread: " + Thread.currentThread().getName());
return Uni.createFrom().future(() -> executor.submit(() -> {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Finished processing for '" + item + "' on thread: " + Thread.currentThread().getName());
return item.toUpperCase();
}));
}
public static void main(String[] args) {
AsyncListProcessingBlocking processor = new AsyncListProcessingBlocking();
System.out.println("Starting main processing...");
List results = Uni.createFrom().item(List.of("x", "y", "z", "w"))
// 将Uni>转换为Multi
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 对Multi中的每个元素应用异步转换,并合并结果
.onItem().transformToUniAndMerge(processor::processItemAsync)
// 打印每个完成的项(可选)
.onItem().invoke(s -> System.out.println("Intermediate result received: " + s))
// 收集所有结果到一个列表中,并阻塞当前线程直到所有结果可用
.collect().asList()
.await().indefinitely(); // 阻塞等待,直到所有结果收集完毕
System.out.println("All items processed. Final collected results: " + results);
executor.shutdown(); // 关闭线程池
System.out.println("Main processing finished.");
}
}
代码解释:
核心概念与注意事项
总结
异步处理Uni中的元素是响应式编程中的常见需求。通过将Uni
>转换为Multi











