0

0

Flink 1.16 Job Manager 重启后消息丢失问题排查及解决

花韻仙語

花韻仙語

发布时间:2025-10-09 13:03:43

|

629人浏览过

|

来源于php中文网

原创

flink 1.16 job manager 重启后消息丢失问题排查及解决

Flink 作业在遇到异常时,会根据配置的重启策略进行自动重启。但如果整个 Job Manager 重启,可能会出现消息丢失的情况。本文旨在帮助你排查和解决 Flink 1.16 中 Job Manager 重启后消息丢失的问题,涵盖了可能的原因和相应的解决方案,确保数据处理的完整性。

问题分析

当 Flink 作业中的某个 Task 遇到异常,并触发配置的重启策略时,Flink 会尝试重启该 Task,恢复到最近一次 checkpoint 的状态,并重新处理 checkpoint 之后的数据。但如果整个 Job Manager 宕机并重启,则情况会更加复杂。以下是一些可能导致消息丢失的原因:

  1. 死循环(Poison Pill): 如果你的数据流中存在无法处理的“毒丸”(Poison Pill)数据,Flink 可能会陷入 fail -> restart -> fail again 的死循环。每次重启后,Flink 都会尝试处理该毒丸数据,导致作业持续失败。

  2. Source 不支持 Checkpointing: 某些 Source 连接器可能不支持 Flink 的 checkpoint 机制。这意味着在 Job Manager 重启后,Source 无法回溯到上次 checkpoint 的位置,从而导致数据丢失

  3. Source 不可回溯: Flink 的容错机制依赖于 Source 的可回溯性。如果 Source 无法回溯,例如从 Socket 或 HTTP 端点读取数据,那么在重启后,将无法重新消费之前未完成的数据。

  4. JobManagerCheckpointStorage: 如果你使用 JobManagerCheckpointStorage,那么 checkpoint 数据存储在 Job Manager 的内存中。当 Job Manager 重启后,这些 checkpoint 数据将会丢失,导致数据丢失。

  5. 集群未配置高可用: 如果你的 Flink 集群没有配置高可用 (HA),那么 Job Manager 宕机后,无法自动恢复,需要手动重启,并且状态数据可能会丢失。

解决方案

针对以上可能的原因,可以采取以下措施来解决 Flink Job Manager 重启后消息丢失的问题:

  1. 处理 Poison Pill 数据:

    • 数据清洗: 在数据进入 Flink 作业之前,进行数据清洗,过滤掉可能导致异常的数据。
    • 错误处理: 在 Flink 作业中,使用 try-catch 块捕获异常,并对异常数据进行特殊处理,例如将其写入死信队列。
    • 跳过错误记录: 使用 side output 将错误记录输出到单独的流,并从主数据流中排除。

    以下代码示例展示了如何使用 try-catch 块处理异常数据:

    DataStream stream = ...;
    
    stream.map(record -> {
        try {
            // 处理 record
            return process(record);
        } catch (Exception e) {
            // 处理异常
            LOG.error("Error processing record: {}", record, e);
            // 可以选择返回一个默认值,或者抛出异常
            return null; // 如果返回 null,需要确保下游操作可以处理 null 值
        }
    }).filter(Objects::nonNull) // 过滤掉 null 值
    .sink(...);
  2. 选择支持 Checkpointing 的 Source:

    笔尖Ai写作
    笔尖Ai写作

    AI智能写作,1000+写作模板,轻松原创,拒绝写作焦虑!一款在线Ai写作生成器

    下载

    尽可能选择支持 Flink checkpoint 机制的 Source 连接器,例如 Apache Kafka Connector, Apache Pulsar Connector 等。这些连接器能够保证在重启后,能够从上次 checkpoint 的位置继续消费数据。

  3. 确保 Source 可回溯:

    如果你的 Source 无法回溯,可以考虑使用 Flink 的 Exactly-Once 语义,结合事务性 Sink,例如 TwoPhaseCommitSinkFunction,来保证数据的完整性。

  4. 使用持久化 Checkpoint 存储:

    不要使用 JobManagerCheckpointStorage,而是选择持久化的 checkpoint 存储,例如 HDFS, S3, RocksDB 等。这样即使 Job Manager 重启,checkpoint 数据也不会丢失。

    在 flink-conf.yaml 中配置 checkpoint 存储:

    state.backend: filesystem
    state.checkpoints.dir: hdfs:///flink/checkpoints
    state.savepoints.dir: hdfs:///flink/savepoints
  5. 配置 Flink 集群高可用:

    配置 Flink 集群的高可用 (HA),确保在 Job Manager 宕机后,能够自动切换到备用的 Job Manager,并从 checkpoint 恢复作业状态。

    关于 HA 的配置,请参考 Flink 官方文档:https://www.php.cn/link/3dd420c40e25463497c9fbaabf8b4621

注意事项

  • 监控: 监控 Flink 作业的运行状态,及时发现并解决问题。
  • 日志: 查看 Flink 的日志,了解作业的运行情况和错误信息。
  • 测试: 在生产环境部署之前,进行充分的测试,确保作业的稳定性和可靠性。
  • 版本: 使用稳定的 Flink 版本,并及时更新到最新的版本。

总结

解决 Flink Job Manager 重启后消息丢失的问题需要综合考虑多个方面,包括数据质量、Source 连接器的选择、checkpoint 存储的选择以及集群的高可用配置。通过合理的配置和优化,可以有效地避免消息丢失,确保 Flink 作业的稳定性和可靠性。

相关专题

更多
kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

167

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

Java 大数据处理基础(Hadoop 方向)
Java 大数据处理基础(Hadoop 方向)

本专题聚焦 Java 在大数据离线处理场景中的核心应用,系统讲解 Hadoop 生态的基本原理、HDFS 文件系统操作、MapReduce 编程模型、作业优化策略以及常见数据处理流程。通过实际示例(如日志分析、批处理任务),帮助学习者掌握使用 Java 构建高效大数据处理程序的完整方法。

159

2025.12.08

apache是什么意思
apache是什么意思

Apache是Apache HTTP Server的简称,是一个开源的Web服务器软件。是目前全球使用最广泛的Web服务器软件之一,由Apache软件基金会开发和维护,Apache具有稳定、安全和高性能的特点,得益于其成熟的开发和广泛的应用实践,被广泛用于托管网站、搭建Web应用程序、构建Web服务和代理等场景。本专题为大家提供了Apache相关的各种文章、以及下载和课程,希望对各位有所帮助。

409

2023.08.23

apache启动失败
apache启动失败

Apache启动失败可能有多种原因。需要检查日志文件、检查配置文件等等。想了解更多apache启动的相关内容,可以阅读本专题下面的文章。

930

2024.01.16

http500解决方法
http500解决方法

http500解决方法有检查服务器日志、检查代码错误、检查服务器配置、检查文件和目录权限、检查资源不足、更新软件版本、重启服务器或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

378

2023.11.09

http请求415错误怎么解决
http请求415错误怎么解决

解决方法:1、检查请求头中的Content-Type;2、检查请求体中的数据格式;3、使用适当的编码格式;4、使用适当的请求方法;5、检查服务器端的支持情况。更多http请求415错误怎么解决的相关内容,可以阅读下面的文章。

413

2023.11.14

Golang 性能分析与pprof调优实战
Golang 性能分析与pprof调优实战

本专题系统讲解 Golang 应用的性能分析与调优方法,重点覆盖 pprof 的使用方式,包括 CPU、内存、阻塞与 goroutine 分析,火焰图解读,常见性能瓶颈定位思路,以及在真实项目中进行针对性优化的实践技巧。通过案例讲解,帮助开发者掌握 用数据驱动的方式持续提升 Go 程序性能与稳定性。

8

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
RunnerGo从入门到精通
RunnerGo从入门到精通

共22课时 | 1.7万人学习

尚学堂Mahout视频教程
尚学堂Mahout视频教程

共18课时 | 3.2万人学习

Linux优化视频教程
Linux优化视频教程

共14课时 | 3.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号