0

0

2021年最新最全Flink系列教程__Flink高级API(四)

絕刀狂花

絕刀狂花

发布时间:2025-09-10 08:07:11

|

700人浏览过

|

来源于php中文网

原创

今日目标是深入了解flink的高级api,包括flink的四大基石、窗口操作、时间和水印机制以及状态管理。本文将详细介绍这些关键概念和相关操作,帮助你掌握flink的核心功能。

Flink的四大基石

Flink的四大基石包括:

  1. Checkpoint - 检查点:用于实现分布式一致性,解决数据丢失问题,支持故障恢复。检查点存储的是全局状态,并持久化在HDFS分布式文件系统中。

  2. State - 状态:分为托管状态(Managed State)和原始状态(Raw State)。从数据结构的角度来看,托管状态包括ValueState、ListState、MapState和BroadcastState。

  3. Time - 时间:分为事件时间(EventTime)、摄取时间(IngestionTime)和处理时间(ProcessTime)。

  4. Window - 窗口:用于将无界数据转换为有界数据,支持时间窗口和计数窗口,包括滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

窗口操作

窗口操作是Flink处理流数据的关键功能之一。窗口的作用是将动态、无界的数据划定范围,转换为有界、静态的数据进行计算。

为什么需要窗口?

  • 数据是动态的、无界的,需要窗口来划定范围,将无界数据转换成有界、静态的数据进行计算。

窗口分类:

  • 时间窗口(Time Window):基于时间进行分类,常见的窗口级别包括一天、一小时、一分钟等。

    • 滚动窗口(Tumbling Window):窗口时间和滑动时间相同。
    • 滑动窗口(Sliding Window):滑动时间小于窗口时间,窗口会重叠。
    • 会话窗口(Session Window):基于会话进行分类。
  • 计数窗口(Count Window):基于计数进行分类。

    • 滚动计数窗口:每达到一定数量进行统计。
    • 滑动计数窗口:每达到一定数量进行统计,但窗口会重叠。

如何使用窗口?

2021年最新最全Flink系列教程__Flink高级API(四)

时间窗口案例:

  • 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量(基于时间的滚动窗口)。

  • 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量(基于时间的滑动窗口)。

package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

    //2.读取 socket 数据源
    DataStreamSource source = env.socketTextStream("192.168.88.161", 9999);

    //3.将9,3转为CartInfo(9,3)
    DataStream mapDS = source.map(new MapFunction() {
        @Override
        public CartInfo map(String value) throws Exception {
            String[] kv = value.split(",");
            return new CartInfo(kv[0], Integer.parseInt(kv[1]));
        }
    });

    //4.按照 sensorId 分组并划分滚动窗口为5秒,在窗口上求和
    // Tumbling(滚动)Processing(处理)TimeWindows(时间窗口)
    //需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量
    SingleOutputStreamOperator result1 = mapDS.keyBy(t -> t.sensorId)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .sum("count");

    //需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量
    SingleOutputStreamOperator result2 = mapDS.keyBy(t -> t.sensorId)
            .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
            .sum("count");

    //5.打印输出
    //result1.print();
    result2.print();

    //6.execute
    env.execute();
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
    private String sensorId;//信号灯id
    private Integer count;//通过该信号灯的车的数量
}

}

计数窗口案例:

  • 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计(基于数量的滚动窗口)。

  • 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计(基于数量的滑动窗口)。

    BibiGPT-哔哔终结者
    BibiGPT-哔哔终结者

    B站视频总结器-一键总结 音视频内容

    下载
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CountWindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

    //2.读取 socket 数据源
    DataStreamSource source = env.socketTextStream("192.168.88.161", 9999);

    //3.将9,3转为CartInfo(9,3)
    DataStream mapDS = source.map(new MapFunction() {
        @Override
        public WindowDemo01.CartInfo map(String value) throws Exception {
            String[] kv = value.split(",");
            return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
        }
    });

    // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
    //        //countWindow(long size, long slide)
    SingleOutputStreamOperator result1 = mapDS.keyBy(t -> t.getSensorId())
            .countWindow(5)
            .sum("count");

    // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
    SingleOutputStreamOperator result2 = mapDS.keyBy(t -> t.getSensorId())
            .countWindow(5, 3)
            .sum("count");

    //打印输出
    //result1.print();
    result2.print();

    //执行环境
    env.execute();
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
    private String sensorId;//信号灯id
    private Integer count;//通过该信号灯的车的数量
}

}

Flink时间 - Time 和水印 - Watermark

时间(Time)

2021年最新最全Flink系列教程__Flink高级API(四)

水印机制(Watermark)

水印机制主要用于解决数据延迟和数据乱序问题。水印(时间戳)等于事件时间减去允许的最大延迟时间。窗口触发的条件是水印时间大于等于窗口的结束时间。

2021年最新最全Flink系列教程__Flink高级API(四)

需求

有订单数据,格式为:(订单ID,用户ID,时间戳/事件时间,订单金额)。要求每隔5秒计算5秒内每个用户的订单总金额,并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时3秒)问题。

package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;

public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
    DataStreamSource source = env.addSource(new SourceFunction() {
        boolean flag = true;
        Random rm = new Random();

        @Override
        public void run(SourceContext ctx) throws Exception {
            while (flag) {
                ctx.collect(new Order(
                    UUID.randomUUID().toString(),
                    rm.nextInt(3),
                    rm.nextInt(101),
                    //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                    System.currentTimeMillis() - rm.nextInt(5) * 1000
                ));
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    });

    //3.Transformation
    //-告诉Flink要基于事件时间来计算!
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
    DataStream result = source.assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
            .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
    )
        //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
        //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
        .keyBy(t -> t.userId)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .sum("money");

    //4.Sink
    result.print();

    //5.execute
    env.execute();
}

//创建订单类
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
    private String orderId;
    private Integer userId;
    private Integer money;
    private Long eventTime;
}

}

自定义重写接口实现水印机制

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;

public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
    DataStreamSource source = env.addSource(new SourceFunction() {
        boolean flag = true;
        Random rm = new Random();

        @Override
        public void run(SourceContext ctx) throws Exception {
            while (flag) {
                ctx.collect(new Order(
                    UUID.randomUUID().toString(),
                    rm.nextInt(3),
                    rm.nextInt(101),
                    //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                    System.currentTimeMillis() - rm.nextInt(5) * 1000
                ));
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    });

    //3.Transformation
    //-告诉Flink要基于事件时间来计算!
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
    DataStream result = source.assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
            .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
    )
        //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
        //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
        .keyBy(t -> t.userId)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .sum("money");

    //4.Sink
    result.print();

    //5.execute
    env.execute();
}

//创建订单类
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
    private String orderId;
    private Integer userId;
    private Integer money;
    private Long eventTime;
}

}

Flink状态管理

状态是基于key或操作算子的中间结果。Flink的状态分为两种:托管状态(Managed State)和原始状态(Raw State)。托管状态又分为基于key的状态(Keyed State)和基于操作的状态(Operator State)。

Keyed State支持的数据结构包括:

  • ValueState
  • ListState
  • MapState
  • BroadcastState

Operator State支持的数据结构包括:

  • 字节数组
  • ListState

Flink Keyed State 案例

// Keyed State 案例代码

Flink Operator State 案例

// Operator State 案例代码

通过以上详细介绍和代码示例,你应该能够更好地理解和应用Flink的高级API,掌握窗口操作、时间和水印机制以及状态管理等关键概念。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

322

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

231

2023.10.07

counta和count的区别
counta和count的区别

Count函数用于计算指定范围内数字的个数,而CountA函数用于计算指定范围内非空单元格的个数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

196

2023.11.20

session失效的原因
session失效的原因

session失效的原因有会话超时、会话数量限制、会话完整性检查、服务器重启、浏览器或设备问题等等。详细介绍:1、会话超时:服务器为Session设置了一个默认的超时时间,当用户在一段时间内没有与服务器交互时,Session将自动失效;2、会话数量限制:服务器为每个用户的Session数量设置了一个限制,当用户创建的Session数量超过这个限制时,最新的会覆盖最早的等等。

307

2023.10.17

session失效解决方法
session失效解决方法

session失效通常是由于 session 的生存时间过期或者服务器关闭导致的。其解决办法:1、延长session的生存时间;2、使用持久化存储;3、使用cookie;4、异步更新session;5、使用会话管理中间件。

729

2023.10.18

cookie与session的区别
cookie与session的区别

本专题整合了cookie与session的区别和使用方法等相关内容,阅读专题下面的文章了解更详细的内容。

88

2025.08.19

treenode的用法
treenode的用法

​在计算机编程领域,TreeNode是一种常见的数据结构,通常用于构建树形结构。在不同的编程语言中,TreeNode可能有不同的实现方式和用法,通常用于表示树的节点信息。更多关于treenode相关问题详情请看本专题下面的文章。php中文网欢迎大家前来学习。

533

2023.12.01

C++ 高效算法与数据结构
C++ 高效算法与数据结构

本专题讲解 C++ 中常用算法与数据结构的实现与优化,涵盖排序算法(快速排序、归并排序)、查找算法、图算法、动态规划、贪心算法等,并结合实际案例分析如何选择最优算法来提高程序效率。通过深入理解数据结构(链表、树、堆、哈希表等),帮助开发者提升 在复杂应用中的算法设计与性能优化能力。

17

2025.12.22

c++主流开发框架汇总
c++主流开发框架汇总

本专题整合了c++开发框架推荐,阅读专题下面的文章了解更多详细内容。

80

2026.01.09

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.4万人学习

C# 教程
C# 教程

共94课时 | 6.5万人学习

Java 教程
Java 教程

共578课时 | 44.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号