
webflux中的`repeat`操作符用于重复订阅上游流,而`then`操作符则在当前流完成后切换到新的流。理解这两个操作符的组合行为至关重要,特别是当`then`操作符的位置在`repeat`之前或之后时,流的执行顺序和重复范围会产生显著差异。本文将深入探讨这些交互模式,并通过代码示例揭示其底层机制。
repeat操作符是Reactor中一个强大的功能,它允许我们指定一个Publisher在完成后重新订阅其上游,从而重复发射数据序列。其基本行为是,它会重新执行所有位于它之前的操作符链。
考虑以下示例,展示了repeat操作符如何使其上游的doOnNext操作重复执行:
import reactor.core.publisher.Mono;
public class RepeatExample {
    public static void main(String[] args) {
        Mono.just(5)
                .doOnNext(i -> System.out.println("next 1: " + i))
                .doOnNext(i -> System.out.println("next 2: " + i))
                .doOnNext(i -> System.out.println("next 3: " + i))
                .repeat(2) // 重复2次,总共执行3次
                .subscribe();
    }
}输出:
next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5
从输出可以看出,repeat(2)使得整个Mono.just(5)及其后续的doOnNext操作重复执行了3次(原始执行1次 + 重复2次)。这表明repeat操作符会重新订阅其上游的整个序列。
即使将repeat操作符的位置移动到中间,只要它位于doOnNext链的某个位置,它依然会使其上游的所有操作重复。
import reactor.core.publisher.Mono;
public class RepeatExample2 {
    public static void main(String[] args) {
        Mono.just(5)
                .doOnNext(i -> System.out.println("next 1: " + i))
                .repeat(2) // 重复2次
                .doOnNext(i -> System.out.println("next 2: " + i))
                .doOnNext(i -> System.out.println("next 3: " + i))
                .subscribe();
    }
}输出:
next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5 next 1: 5 next 2: 5 next 3: 5
这个例子进一步确认,repeat操作符会影响其上游的整个流,使其重新订阅。
then操作符在Webflux中用于在当前Publisher完成(即发射了所有元素或发出了完成信号)之后,订阅并发射另一个Publisher的元素。它是一个非常重要的操作符,因为它会“切换”流的上下文,并且通常会改变流的类型(例如,从Flux<T>到Mono<V>)。
repeat和then的组合行为取决于它们在操作符链中的相对位置。
当then操作符位于repeat操作符之前时,then及其后续的操作都会被repeat操作符所包含,因此会被重复执行。
import reactor.core.publisher.Mono;
public class RepeatThenExample1 {
    public static void main(String[] args) {
        Mono.just(5)
                .doOnNext(i -> System.out.println("next 1: " + i))
                .doOnNext(i -> System.out.println("next 2: " + i))
                .then(Mono.just("hello")) // then 操作符在 repeat 之前
                .doOnNext(s -> System.out.println("next 3: " + s))
                .repeat(2) // 重复2次
                .subscribe();
    }
}输出:
next 1: 5 next 2: 5 next 3: hello next 1: 5 next 2: 5 next 3: hello next 1: 5 next 2: 5 next 3: hello
在这个例子中,repeat(2)操作符位于then(Mono.just("hello"))之后。这意味着整个链条——包括Mono.just(5)、两个doOnNext、then以及其后的doOnNext——都被视为repeat的上游,因此整个序列被重复执行了3次。
这是最容易引起混淆的情况。当then操作符位于repeat操作符之后时,行为会发生显著变化。
import reactor.core.publisher.Mono;
public class RepeatThenExample2 {
    public static void main(String[] args) {
        Mono.just(5)
                .doOnNext(i -> System.out.println("next 1: " + i)) // Mono 流
                .repeat(2) // 将 Mono 转换为 Flux,并重复执行其上游
                .doOnNext(i -> System.out.println("next 2: " + i))  // Flux 流
                .then(Mono.just("hello")) // then 操作符在 repeat 之后,作用于已完成的 Flux
                .doOnNext(s -> System.out.println("next 3: " + s)) // Mono 流
                .subscribe();
    }
}输出:
next 1: 5 next 2: 5 next 1: 5 next 2: 5 next 1: 5 next 2: 5 next 3: hello
观察这个输出,next 1: 5 和 next 2: 5 被重复了3次,但是 next 3: hello 只出现了1次。这是为什么呢?
核心原理:
可以这样理解:repeat(2) 创造了一个新的“重复区域”,这个区域内的操作会被重复。而 then 操作符在整个“重复区域”完成之后才被触发,因此它不属于这个重复区域。
以上就是Webflux repeat 与 then 操作符的交互行为深度解析的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号