0

0

响应式编程中 Reactor Mono/Flux 实现轮询请求的策略与实践

碧海醫心

碧海醫心

发布时间:2025-11-09 15:06:01

|

1016人浏览过

|

来源于php中文网

原创

响应式编程中 reactor mono/flux 实现轮询请求的策略与实践

本教程探讨了在 Reactor 响应式编程中实现外部系统状态轮询的两种主要策略。首先介绍基于 `Mono` 的 `retryWhen` 机制,适用于在错误发生时重试。随后深入讲解利用 `Flux.interval` 实现固定间隔轮询的方法,该方法在控制请求频率、并发性及避免异常流控制方面具有优势,并提供了详细的代码示例和选择考量。

引言:响应式轮询的需求

在构建现代分布式系统时,我们经常需要与外部服务交互,并等待其状态从“处理中”变为“就绪”。这种持续检查外部状态直到满足特定条件的行为,通常通过轮询(Polling)机制实现。在基于 Project Reactor 的响应式编程范式中,实现高效、健壮且资源友好的轮询策略是关键。本文将深入探讨两种主要的 Reactor 轮询实现方法:基于 retryWhen 的错误重试机制和基于 Flux.interval 的固定间隔轮询策略。

基于 retryWhen 的轮询实现

最初,开发者可能会倾向于使用 Reactor 的 retryWhen 操作符来处理轮询场景。这种方法的核心思想是:当外部系统状态不满足条件时,通过抛出特定异常来触发重试机制,从而实现周期性的状态检查。

实现原理

该方法通常结合 WebClient 发起请求,并通过 filter 操作符检查返回的状态。如果状态不符合预期(例如,系统尚未就绪),则使用 switchIfEmpty 结合 Mono.error() 抛出一个自定义异常。随后,retryWhen 操作符捕获此异常,并根据配置的重试策略(如固定延迟)重新订阅上游流,再次发起请求。

以下是一个典型的实现示例:

花生AI
花生AI

B站推出的AI视频创作工具

下载
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.Duration;

// 假设 Status 是一个包含 isReady() 方法的枚举或类
// 假设 SystemStateNotReadyException 是一个自定义异常

public class PollingWithRetryWhen {

    private final WebClient webClient;
    private final int MAX_ATTEMPT = 5; // 最大重试次数
    private final Duration BACK_OFF = Duration.ofSeconds(1); // 重试间隔

    public PollingWithRetryWhen(WebClient webClient) {
        this.webClient = webClient;
    }

    /**
     * 模拟从外部系统获取状态。
     * 实际应用中会通过 webClient 调用外部API。
     */
    private Mono checkStatus() {
        return webClient.get()
                        .uri("/api/status")
                        .retrieve()
                        .bodyToMono(String.class)
                        .map(response -> Status.from(response)); // 假设 Status.from(String) 能够解析响应
    }

    /**
     * 轮询外部系统直到其状态变为就绪。
     * 如果未就绪,则抛出异常并重试。
     */
    public Mono pollUntilReady() {
        return checkStatus()
                .filter(Status::isReady) // 过滤出已就绪的状态
                .switchIfEmpty(
                    Mono.error(new SystemStateNotReadyException("System is not ready yet.")) // 如果未就绪,则抛出异常
                )
                .retryWhen(
                    Retry.fixedDelay(MAX_ATTEMPT, BACK_OFF) // 固定延迟重试
                         .filter(err -> err instanceof SystemStateNotReadyException) // 只对特定异常进行重试
                );
    }
}

// 示例 Status 类,代表外部系统状态
class Status {
    private final boolean ready;

    public Status(boolean ready) {
        this.ready = ready;
    }

    public boolean isReady() {
        return ready;
    }

    public static Status from(String response) {
        // 模拟解析逻辑,例如根据响应字符串判断是否就绪
        return new Status(response.contains("READY"));
    }
}

// 示例自定义异常,用于在系统未就绪时触发重试
class SystemStateNotReadyException extends RuntimeException {
    public SystemStateNotReadyException(String message) {
        super(message);
    }
}

优点与考量

  • 简洁性: 对于简单的重试逻辑,retryWhen 提供了一种声明式且易于理解的方式。
  • 错误驱动: 这种方法天然地将轮询与错误处理相结合,只有在状态不满足条件(被视为一种“错误”)时才触发重试。
  • 线程安全与内存: Reactor 框架本身在设计时就考虑了线程安全和资源管理。上述代码片段在典型使用场景下通常是线程安全的,并且不会导致内存泄漏,因为 Reactor 的操作符会妥善管理订阅和资源。

然而,retryWhen 的核心是“重试”,这意味着它通常在操作失败(抛出异常)后才触发。如果每次轮询请求本身是成功的(即返回了状态,只是状态值不符合预期),但我们仍想以固定间隔进行检查,那么通过抛出异常来控制流程可能不是最优雅或最高效的方式。

基于 Flux.interval 的高级轮询策略

当需要以精确的固定间隔进行轮询,并且希望更精细地控制轮询过程(例如,独立于每次请求的响应时间),或者避免使用异常来控制正常流程时,Flux.interval 提供了一个更强大的替代方案。

实现原理

Flux.interval 会周期性地发出递增的 Long 值,每个值代表一个时间间隔。我们可以将这些周期性信号映射(concatMap 或 flatMap)到我们的状态检查请求中。通过这种方式,即使某个状态检查请求耗时较长,下一个请求也会在预设的固定间隔后触发,而不是等待上一个请求完全完成后再计算延迟。

优点

  • 固定间隔: 确保请求以固定的时间间隔发送,独立于单个请求的响应时间。
  • 计数器: Flux.interval 发出的 Long 值可以作为轮询尝试的计数器。
  • 并发控制: 可以选择 concatMap(顺序执行,等待前一个完成)或 flatMap(并发执行,不等待前一个完成)来控制状态检查请求的并发行为。
  • 避免异常流控制: 不需要通过抛出和捕获异常来控制轮询流程,这在某些情况下可能带来性能优势。

示例代码

以下是一个使用 Flux.interval 实现固定间隔轮询的示例:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.Duration;
import java.util.concurrent.TimeoutException; // 用于可能的超时异常

public class PollingWithFluxInterval {

    private final WebClient webClient;
    private final int MAX_ATTEMPTS = 10; // 最大轮询尝试次数
    private final Duration INTERVAL = Duration.ofMillis(100); // 每100毫秒发送一次请求

    public PollingWithFluxInterval(WebClient webClient) {
        this.webClient = webClient;
    }

    /**
     * 模拟从外部系统获取状态,返回一个包含尝试次数和状态就绪情况的报告。
     * 模拟了每次请求的网络延迟。
     */
    private Mono fetchStatus(long count) {
        // 模拟网络请求和处理时间,例如50ms
        return webClient.get()
                        .uri("/api/status")

相关文章

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

325

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

232

2023.10.07

scripterror怎么解决
scripterror怎么解决

scripterror的解决办法有检查语法、文件路径、检查网络连接、浏览器兼容性、使用try-catch语句、使用开发者工具进行调试、更新浏览器和JavaScript库或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

187

2023.10.18

500error怎么解决
500error怎么解决

500error的解决办法有检查服务器日志、检查代码、检查服务器配置、更新软件版本、重新启动服务、调试代码和寻求帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

280

2023.10.25

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

481

2023.08.10

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

68

2026.01.16

全民K歌得高分教程大全
全民K歌得高分教程大全

本专题整合了全民K歌得高分技巧汇总,阅读专题下面的文章了解更多详细内容。

127

2026.01.16

C++ 单元测试与代码质量保障
C++ 单元测试与代码质量保障

本专题系统讲解 C++ 在单元测试与代码质量保障方面的实战方法,包括测试驱动开发理念、Google Test/Google Mock 的使用、测试用例设计、边界条件验证、持续集成中的自动化测试流程,以及常见代码质量问题的发现与修复。通过工程化示例,帮助开发者建立 可测试、可维护、高质量的 C++ 项目体系。

54

2026.01.16

java数据库连接教程大全
java数据库连接教程大全

本专题整合了java数据库连接相关教程,阅读专题下面的文章了解更多详细内容。

39

2026.01.15

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.8万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号