
在响应式编程中,Flux代表一个0到N个元素的异步序列。当面对由外部库提供、我们无法直接控制其数据发射机制的Flux时,如何将我们自己的数据注入其中,或将其与我们自己的数据流结合,是一个常见的挑战。通常,Flux本身不提供直接的emit方法,因为它的设计理念是作为数据流的消费者而非直接的生产者(对于外部数据源而言)。
假设我们有一个外部库方法,它返回一个Flux<MappedType>:
Flux<MappedType> aFluxMap = Library.createMappingToMappedType();
我们希望能够将自己的原始对象(例如myObj)发送到这个aFluxMap中,让它们被处理并转换为MappedType,然后继续后续操作。直观上,我们可能期望有一个类似aFluxMap.emit(myObj)的方法,但这样的方法在Flux或Mono中并不存在。
一种常见的误解是尝试使用FluxProcessor和FluxSink来解决:
FluxProcessor p = UnicastProcessor.create().serialize(); FluxSink sink = p.sink(); sink.next(mess); // 这会将消息发送到新创建的Flux 'p'
这种方法的问题在于,它创建了一个新的Flux (p) 并向其发送消息,而不是将消息发送到我们已有的aFluxMap。我们需要的是将我们自己的数据流与aFluxMap关联起来。
Reactor框架提供了强大的操作符来组合和合并不同的数据流。解决上述问题的核心思路是:
在Reactor 3.4及更高版本中,推荐使用Sinks API来创建可控的发射器。Sinks.many()可以创建一个多值发射器,它提供了tryEmitNext、tryEmitComplete等方法来安全地发射数据。
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
// 假设 MappedType 是一个已定义的类型
class MappedType {
private String value;
// 构造函数、getter等
public MappedType(String value) { this.value = value; }
public String getValue() { return value; }
@Override
public String toString() { return "MappedType(" + value + ")"; }
}
public class CustomEmitter {
private final Sinks.Many<String> myDataSink = Sinks.many().multicast().onBackpressureBuffer();
private final Flux<String> myDataFlux = myDataSink.asFlux();
// 模拟外部库的Flux
public Flux<MappedType> createExternalFlux() {
return Flux.just("External1", "External2")
.map(s -> new MappedType("External-" + s));
}
// 发射自定义数据的方法
public void emitMyData(String data) {
myDataSink.tryEmitNext(data);
}
public Flux<String> getMyDataFlux() {
return myDataFlux;
}
public static void main(String[] args) {
CustomEmitter emitter = new CustomEmitter();
// 1. 获取外部库的Flux
Flux<MappedType> aFluxMap = emitter.createExternalFlux();
// 2. 创建我们自己的Flux(这里我们直接使用原始字符串,稍后进行转换)
Flux<String> customRawDataFlux = emitter.getMyDataFlux();
// 3. 将自定义原始数据转换为 MappedType
Flux<MappedType> customMappedDataFlux = customRawDataFlux
.map(raw -> {
System.out.println("Converting custom raw data: " + raw);
// 模拟转换逻辑
return new MappedType("Custom-" + raw);
});
// 4. 合并两个Flux
Flux<MappedType> combinedFlux = Flux.merge(aFluxMap, customMappedDataFlux);
// 订阅合并后的Flux并处理数据
combinedFlux.doOnNext(converted -> System.out.println("Received: " + converted))
.subscribe(
null, // onNext
error -> System.err.println("Error: " + error),
() -> System.out.println("Combined Flux completed.")
);
// 动态发射自定义数据
emitter.emitMyData("MyDataA");
emitter.emitMyData("MyDataB");
// 模拟外部Flux完成后,手动完成自定义Flux
// 如果不手动complete,程序会一直运行等待更多数据
// myDataSink.tryEmitComplete(); // 实际应用中根据业务逻辑决定何时完成
}
}在上述示例中:
根据具体需求选择合适的合并操作符。在大多数动态注入数据的场景中,merge是最常用的,因为它允许并发处理来自不同源的数据。
在原始问题中提到了一个重要的更新: Library.createMappingToMappedType() 返回的aFluxMap的内部源可能是一个UnicastProcessor,并且该UnicastProcessor可能已经被库内部订阅。当尝试通过p.flatMap(raw -> aFluxMap).subscribe()再次订阅aFluxMap时,会遇到"UnicastProcessor can be subscribe once"的异常。
解释: UnicastProcessor是一个单播处理器,它只允许一个订阅者。一旦被订阅,它就会开始发射数据。如果尝试第二次订阅,就会抛出异常。
解决方案:
在使用Sinks.many().multicast().onBackpressureBuffer()时,我们选择了onBackpressureBuffer策略,这意味着如果下游消费者处理速度慢于上游发射速度,Sink会缓冲元素。根据实际需求,你可能需要选择其他背压策略,例如onBackpressureDrop(丢弃元素)或onBackpressureError(发出错误)。
当你的自定义数据流不再有数据需要发射时,记得调用myDataSink.tryEmitComplete()来发出完成信号。这对于下游的Flux操作符(如merge)来说很重要,它们需要知道何时所有上游源都已完成,以便自己也能完成。
在Reactor中,向一个由外部库提供的现有Flux动态注入数据,并非通过直接的emit方法实现,而是通过创建一个可控的自定义Flux,然后利用Flux.merge()等操作符将其与外部Flux合并。这种模式遵循了响应式编程的原则,即通过组合和转换数据流来构建复杂的业务逻辑。同时,需要特别注意UnicastProcessor等单订阅源的特性,避免不当的重复订阅操作。
以上就是Reactor Flux动态数据注入与多源合并策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号