
在响应式编程中,我们经常会遇到这样的场景:一个异步操作返回一个mono<t>(或flux<t>),我们需要从这个t类型对象中提取某个字段,然后使用这个字段作为参数去执行另一个异步操作,最终得到mono<r>。直接在mono<t>上调用.block()来获取t对象,然后提取字段并调用下一个服务,会阻塞当前线程,这与响应式编程的非阻塞、异步特性相悖。
例如,我们有一个获取订单的Mono<Order>,Order对象中包含一个truckId。我们希望利用这个truckId去获取Mono<Truck>,整个过程必须是非阻塞的。
public class Order {
private UUID id;
private String name;
private UUID truckId; // 我们需要提取的字段
// 构造函数、Getter/Setter略
public UUID getTruckId() {
return truckId;
}
}
public class Truck {
private UUID id;
private String model;
// 构造函数、Getter/Setter略
}
// 假设的服务接口
interface OrderService {
Mono<Order> getById(UUID id);
}
interface VehicleService {
Mono<Truck> getByTruckId(UUID truckId);
}我们的目标是:
flatMap是Reactor中一个非常重要的操作符,它允许我们将一个发出T的Mono(即Mono<T>)转换为一个发出R的Mono(即Mono<R>),其中R的生成依赖于T。更具体地说,flatMap接收一个函数,这个函数将T映射为一个新的响应式类型(如Mono<R>或Flux<R>)。
如果我们的最终目标仅仅是获取Mono<Truck>,而不再需要原始的Order对象,那么flatMap是理想的选择。
import reactor.core.publisher.Mono;
import java.util.UUID;
public class ReactiveDataExtraction {
private OrderService orderService; // 假设已注入
private VehicleService vehicleService; // 假设已注入
// 模拟服务方法
private Mono<Order> getById(UUID id) {
// 实际应用中会调用orderService.getById(id)
return Mono.just(new Order(id, "Test Order", UUID.randomUUID()));
}
private Mono<Truck> getByTruckId(UUID truckId) {
// 实际应用中会调用vehicleService.getByTruckId(truckId)
return Mono.just(new Truck(truckId, "Volvo FH"));
}
public Mono<Truck> getTruckFromOrder(UUID orderId) {
Mono<Order> orderMono = getById(orderId);
// 使用flatMap从Mono<Order>中提取truckId并调用getByTruckId
Mono<Truck> truckMono = orderMono.flatMap(order -> getByTruckId(order.getTruckId()));
return truckMono;
}
public static void main(String[] args) {
ReactiveDataExtraction example = new ReactiveDataExtraction();
UUID testOrderId = UUID.randomUUID();
example.getTruckFromOrder(testOrderId)
.subscribe(
truck -> System.out.println("成功获取到卡车信息: " + truck.getModel()),
error -> System.err.println("获取卡车信息失败: " + error.getMessage())
);
// 为了演示非阻塞,通常需要等待异步操作完成
try {
Thread.sleep(1000); // 实际应用中不会这样阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}解释:orderMono.flatMap(order -> getByTruckId(order.getTruckId()))这行代码的含义是:
有时,我们不仅需要链式调用得到的结果(如Truck),还需要原始的数据(如Order)来构建一个更复杂的聚合对象。在这种情况下,我们可以结合使用flatMap和Mono.zip。
假设我们希望将Order和它对应的Truck组合成一个新的Result对象。
首先,定义一个聚合结果类:
public class Result {
private Order order;
private Truck truck;
public Result(Order order, Truck truck) {
this.order = order;
this.truck = truck;
}
// Getter/Setter略
@Override
public String toString() {
return "Result{" +
"orderId=" + (order != null ? order.getId() : "null") +
", orderName='" + (order != null ? order.getName() : "null") + '\'' +
", truckId=" + (truck != null ? truck.getId() : "null") +
", truckModel='" + (truck != null ? truck.getModel() : "null") + '\'' +
'}';
}
}然后,实现聚合逻辑:
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2; // 用于Mono.zip的默认输出
import java.util.UUID;
public class ReactiveDataAggregation {
private OrderService orderService; // 假设已注入
private VehicleService vehicleService; // 假设已注入
// 模拟服务方法
private Mono<Order> getById(UUID id) {
return Mono.just(new Order(id, "Test Order " + id.toString().substring(0,4), UUID.randomUUID()));
}
private Mono<Truck> getByTruckId(UUID truckId) {
return Mono.just(new Truck(truckId, "Model-" + truckId.toString().substring(0,4)));
}
public Mono<Result> getOrderAndTruck(UUID orderId) {
Mono<Order> orderMono = getById(orderId);
// 1. 从orderMono派生出truckMono
// 注意:这里truckMono的生成依赖于orderMono,它们是串行的
Mono<Truck> truckMono = orderMono.flatMap(order -> getByTruckId(order.getTruckId()));
// 2. 使用Mono.zip组合原始的orderMono和派生出的truckMono
// Mono.zip会等待两个Mono都发出值后,将它们组合成一个Tuple2
// 这里需要注意,如果orderMono和truckMono是独立的,zip会并行处理。
// 但由于truckMono的创建依赖于orderMono,这里的zip实际上会在orderMono发出值后,
// 再等待truckMono发出值。
Mono<Tuple2<Order, Truck>> zippedMono = Mono.zip(orderMono, truckMono);
// 3. 使用flatMap将Tuple2映射为自定义的Result对象
Mono<Result> resultMono = zippedMono.flatMap(tuple ->
Mono.just(new Result(tuple.getT1(), tuple.getT2()))
);
return resultMono;
}
public static void main(String[] args) {
ReactiveDataAggregation example = new ReactiveDataAggregation();
UUID testOrderId = UUID.randomUUID();
example.getOrderAndTruck(testOrderId)
.subscribe(
result -> System.out.println("成功聚合订单和卡车信息: " + result),
error -> System.err.println("聚合信息失败: " + error.getMessage())
);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}解释:
通过flatMap和Mono.zip等核心操作符,Reactor提供了一种强大而优雅的方式来处理异步数据流中的依赖关系和数据聚合。从Mono中非阻塞地提取内部字段并进行链式服务调用,是构建高性能、高响应性应用程序的关键技术。掌握这些模式,将使你能够更好地利用响应式编程的优势,构建出健壮且可扩展的系统。
以上就是Reactor链式操作:从Mono中提取数据并进行服务编排的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号