
在Reactor中,Flux和Mono是响应式流的核心构建块,它们代表了0到N个(Flux)或0到1个(Mono)元素的异步序列。它们本质上是发布者(Publisher),负责发出事件,而不是提供一个直接的“注入”或“发送”方法供外部调用。这意味着,你不能像操作一个队列那样,直接向一个已经存在的Flux实例调用一个类似emit(object)的方法来添加元素。
当你从一个外部库获得一个Flux<MappedType>实例时,例如:
Flux<MappedType> aFluxMap = Library.createMappingToMappedType();
这个aFluxMap已经是一个完整的发布者,它有自己的数据源和处理逻辑。你通常可以订阅它来消费其产生的MappedType事件,例如通过aFluxMap.doOnNext(converted -> doJob(converted))。然而,直接向它“发送”你的自定义对象以期望它进行转换并发出,是不符合其设计模式的。
为了能够动态地发出自定义事件,Reactor提供了FluxProcessor和FluxSink。FluxProcessor是一个特殊的类型,它既是Subscriber又是Publisher,允许你向其发送事件(作为Subscriber),并从它接收事件(作为Publisher)。FluxSink则是FluxProcessor的一个接口,提供了next()、error()和complete()等方法,用于精确控制事件的发射。
以下是如何创建一个可控的Flux并向其发射事件的基本示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
// 假设我们有某种RawType和MappedType
class RawType { String data; public RawType(String data) { this.data = data; } @Override public String toString() { return "RawType(" + data + ")"; } }
class MappedType { String mappedData; public MappedType(String mappedData) { this.mappedData = mappedData; } @Override public String toString() { return "MappedType(" + mappedData + ")"; } }
public class CustomFluxEmitter {
public static void main(String[] args) {
// 1. 创建一个UnicastProcessor作为我们自定义事件的源
UnicastProcessor<RawType> customRawProcessor = UnicastProcessor.create();
// 2. 获取FluxSink,用于向customRawProcessor发射事件
FluxSink<RawType> rawSink = customRawProcessor.sink();
// 3. 将自定义的RawType流转换为MappedType流
// 这里假设我们有一个转换函数,或者MappedType可以直接从RawType构建
Flux<MappedType> yourCustomMappedFlux = customRawProcessor
.map(raw -> new MappedType("Mapped(" + raw.data + ")"));
// 此时 yourCustomMappedFlux 是一个可以由 rawSink 控制的 MappedType 流
yourCustomMappedFlux.subscribe(
mapped -> System.out.println("Custom Mapped Type: " + mapped),
error -> System.err.println("Error in custom flux: " + error),
() -> System.out.println("Custom flux completed")
);
// 4. 模拟发射自定义事件
rawSink.next(new RawType("Input A"));
rawSink.next(new RawType("Input B"));
// rawSink.complete(); // 可以在适当时候完成流
}
}这段代码展示了如何创建一个由你控制的Flux (yourCustomMappedFlux),并通过rawSink向其发射RawType事件,这些事件随后被转换为MappedType。
既然不能直接向外部库的Flux注入事件,那么最常见的解决方案是创建一个你自己的可控Flux,然后使用Reactor的组合操作符(如merge、concat、zip)将其与外部库的Flux合并。这样,你就拥有了一个包含两部分事件的统一流:一部分来自外部库,另一部分来自你的自定义发射器。
考虑到你的目标是“发射一些对象到aFluxMap以获取MappedType”,并且aFluxMap本身已经是Flux<MappedType>,这意味着你希望将你的自定义MappedType事件与aFluxMap产生的MappedType事件合并。
以下是使用Flux.merge操作符的示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import java.time.Duration;
// 假设 MappedType 已经定义,并且 Library 提供了 createMappingToMappedType 方法
// 模拟外部库的Flux
class Library {
public static Flux<MappedType> createMappingToMappedType() {
// 模拟一个每秒发出一个 MappedType 的外部 Flux
return Flux.interval(Duration.ofSeconds(1))
.take(3) // 只发出3个元素
.map(i -> new MappedType("Library Mapped " + i));
}
}
public class MergeFluxExample {
public static void main(String[] args) throws InterruptedException {
// 1. 获取外部库的 Flux
Flux<MappedType> aFluxMap = Library.createMappingToMappedType();
// 2. 创建一个可控的 Flux,用于发射你的自定义 MappedType 事件
UnicastProcessor<MappedType> customProcessor = UnicastProcessor.create();
FluxSink<MappedType> customSink = customProcessor.sink();
Flux<MappedType> yourCustomFlux = customProcessor;
// 3. 使用 Flux.merge 合并两个 Flux
// merge操作符会将两个或更多Publisher的元素交错合并到一个新的Flux中
Flux<MappedType> combinedFlux = Flux.merge(aFluxMap, yourCustomFlux);
// 4. 订阅合并后的 Flux 并处理事件
combinedFlux.doOnNext(mapped -> System.out.println("Received: " + mapped))
.doOnComplete(() -> System.out.println("Combined Flux Completed"))
.subscribe();
// 5. 模拟在运行时发射自定义 MappedType 事件
System.out.println("Emitting custom events...");
Thread.sleep(500); // 等待一下,让library的flux先开始
customSink.next(new MappedType("Custom A"));
Thread.sleep(1200);
customSink.next(new MappedType("Custom B"));
Thread.sleep(1200);
customSink.next(new MappedType("Custom C"));
customSink.complete(); // 完成自定义流
// 等待一段时间观察输出
Thread.sleep(5000);
}
}在这个例子中,Flux.merge(aFluxMap, yourCustomFlux)创建了一个新的Flux,它会同时监听aFluxMap和yourCustomFlux,并将它们发出的MappedType事件交错地
以上就是如何向现有Reactor Flux注入自定义事件流的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号