今日目标是深入了解flink的高级api,包括flink的四大基石、窗口操作、时间和水印机制以及状态管理。本文将详细介绍这些关键概念和相关操作,帮助你掌握flink的核心功能。
Flink的四大基石
Flink的四大基石包括:
Checkpoint - 检查点:用于实现分布式一致性,解决数据丢失问题,支持故障恢复。检查点存储的是全局状态,并持久化在HDFS分布式文件系统中。
State - 状态:分为托管状态(Managed State)和原始状态(Raw State)。从数据结构的角度来看,托管状态包括ValueState、ListState、MapState和BroadcastState。
Time - 时间:分为事件时间(EventTime)、摄取时间(IngestionTime)和处理时间(ProcessTime)。
Window - 窗口:用于将无界数据转换为有界数据,支持时间窗口和计数窗口,包括滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
窗口操作
窗口操作是Flink处理流数据的关键功能之一。窗口的作用是将动态、无界的数据划定范围,转换为有界、静态的数据进行计算。
为什么需要窗口?
- 数据是动态的、无界的,需要窗口来划定范围,将无界数据转换成有界、静态的数据进行计算。
窗口分类:
-
时间窗口(Time Window):基于时间进行分类,常见的窗口级别包括一天、一小时、一分钟等。
- 滚动窗口(Tumbling Window):窗口时间和滑动时间相同。
- 滑动窗口(Sliding Window):滑动时间小于窗口时间,窗口会重叠。
- 会话窗口(Session Window):基于会话进行分类。
-
计数窗口(Count Window):基于计数进行分类。
- 滚动计数窗口:每达到一定数量进行统计。
- 滑动计数窗口:每达到一定数量进行统计,但窗口会重叠。
如何使用窗口?

时间窗口案例:
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量(基于时间的滚动窗口)。
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量(基于时间的滑动窗口)。
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;public class WindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
//2.读取 socket 数据源 DataStreamSourcesource = env.socketTextStream("192.168.88.161", 9999); //3.将9,3转为CartInfo(9,3) DataStream mapDS = source.map(new MapFunction () { @Override public CartInfo map(String value) throws Exception { String[] kv = value.split(","); return new CartInfo(kv[0], Integer.parseInt(kv[1])); } }); //4.按照 sensorId 分组并划分滚动窗口为5秒,在窗口上求和 // Tumbling(滚动)Processing(处理)TimeWindows(时间窗口) //需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量 SingleOutputStreamOperator result1 = mapDS.keyBy(t -> t.sensorId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("count"); //需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量 SingleOutputStreamOperator result2 = mapDS.keyBy(t -> t.sensorId) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //5.打印输出 //result1.print(); result2.print(); //6.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
计数窗口案例:
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计(基于数量的滚动窗口)。
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计(基于数量的滑动窗口)。
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CountWindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
//2.读取 socket 数据源 DataStreamSourcesource = env.socketTextStream("192.168.88.161", 9999); //3.将9,3转为CartInfo(9,3) DataStream mapDS = source.map(new MapFunction () { @Override public WindowDemo01.CartInfo map(String value) throws Exception { String[] kv = value.split(","); return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1])); } }); // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 // //countWindow(long size, long slide) SingleOutputStreamOperator result1 = mapDS.keyBy(t -> t.getSensorId()) .countWindow(5) .sum("count"); // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 SingleOutputStreamOperator result2 = mapDS.keyBy(t -> t.getSensorId()) .countWindow(5, 3) .sum("count"); //打印输出 //result1.print(); result2.print(); //执行环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
Flink时间 - Time 和水印 - Watermark
时间(Time):
水印机制(Watermark):
水印机制主要用于解决数据延迟和数据乱序问题。水印(时间戳)等于事件时间减去允许的最大延迟时间。窗口触发的条件是水印时间大于等于窗口的结束时间。
需求:
有订单数据,格式为:(订单ID,用户ID,时间戳/事件时间,订单金额)。要求每隔5秒计算5秒内每个用户的订单总金额,并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时3秒)问题。
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID;public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置属性 ProcessingTime , 新版本 默认设置 EventTime //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long DataStreamSourcesource = env.addSource(new SourceFunction () { boolean flag = true; Random rm = new Random(); @Override public void run(SourceContext ctx) throws Exception { while (flag) { ctx.collect(new Order( UUID.randomUUID().toString(), rm.nextInt(3), rm.nextInt(101), //模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000 System.currentTimeMillis() - rm.nextInt(5) * 1000 )); Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }); //3.Transformation //-告诉Flink要基于事件时间来计算! //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime DataStream result = source.assignTimestampsAndWatermarks( WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.eventTime) ) //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 .keyBy(t -> t.userId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); //4.Sink result.print(); //5.execute env.execute(); } //创建订单类 @Data @AllArgsConstructor @NoArgsConstructor public static class Order{ private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
自定义重写接口实现水印机制:
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID;public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置属性 ProcessingTime , 新版本 默认设置 EventTime //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long DataStreamSourcesource = env.addSource(new SourceFunction () { boolean flag = true; Random rm = new Random(); @Override public void run(SourceContext ctx) throws Exception { while (flag) { ctx.collect(new Order( UUID.randomUUID().toString(), rm.nextInt(3), rm.nextInt(101), //模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000 System.currentTimeMillis() - rm.nextInt(5) * 1000 )); Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }); //3.Transformation //-告诉Flink要基于事件时间来计算! //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime DataStream result = source.assignTimestampsAndWatermarks( WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.eventTime) ) //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 .keyBy(t -> t.userId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); //4.Sink result.print(); //5.execute env.execute(); } //创建订单类 @Data @AllArgsConstructor @NoArgsConstructor public static class Order{ private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
Flink状态管理
状态是基于key或操作算子的中间结果。Flink的状态分为两种:托管状态(Managed State)和原始状态(Raw State)。托管状态又分为基于key的状态(Keyed State)和基于操作的状态(Operator State)。
Keyed State支持的数据结构包括:
- ValueState
- ListState
- MapState
- BroadcastState
Operator State支持的数据结构包括:
- 字节数组
- ListState
Flink Keyed State 案例:
// Keyed State 案例代码
Flink Operator State 案例:
// Operator State 案例代码
通过以上详细介绍和代码示例,你应该能够更好地理解和应用Flink的高级API,掌握窗口操作、时间和水印机制以及状态管理等关键概念。












