快速掌握flink sql基础
00
前言
Flink 作为一个统一的批流处理框架,其 Table API 和 SQL 是高层次的处理 API。尽管当前功能仍在积极开发中,但已经可以支持批流统一处理。Table API 允许在 Java 和 Scala 中使用直观的查询 API,结合关系运算符如 select、filter 和 join 进行查询。而 Flink SQL 则允许直接在代码中编写 SQL 实现查询操作,基于 Apache Calcite 实现 SQL 标准支持。
01
1、导入必要的依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>flink-table-planner 是 Table API 的核心部分,提供运行时环境和执行计划生成;flink-table-api-scala-bridge 则负责 Table API 与 DataStream/DataSet API 之间的连接支持。这些依赖在 IDE 开发环境中需要添加,而在生产环境中,lib 目录通常已包含 planner,只需添加 bridge 即可。如果需要使用自定义函数或连接 Kafka,还需要 flink-table-common 中的 SQL client。
02
2、两种 planner(旧版与 Blink)的区别
Blink 将批处理视为流处理的特殊情况,不支持表与 DataSet 之间的转换,批处理作业直接转换为 DataStream 程序处理。Blink planner 不支持 BatchTableSource,使用有界的 Blink planner 只支持新目录,不支持旧的 ExternalCatalog。旧版 planner 和 Blink planner 在 FilterableTableSource 的实现上不兼容,旧版会将 PlannerExpressions 下推到 filterableTableSource,而 Blink planner 则下推 Expressions。基于字符串的配置选项仅适用于 Blink planner,PlannerConfig 在两种 planner 中实现不同。Blink planner 支持在单个 DAG 中优化多个 sink(仅在 TableEnvironment 中支持),而旧版 planner 则为每个 sink 创建独立的 DAG,不支持目录统计,而 Blink planner 支持。
03
3、表(Table)的概念
TableEnvironment 可以注册 Catalog,并基于 Catalog 注册表,维护 Catalog-Table 映射。表由标识符指定,包含 Catalog 名、数据库名和对象名(表名)。如果未指定目录或数据库,使用当前默认值。
04
4、连接文件系统(Csv 格式)
通过
tableEnv.connect()
05
5、测试案例(新)
需求:从 txt 文件读取数据,过滤掉 id 不为 sensor_1 的数据。
实现思路:首先创建 table 环境,通过 connect 方法读取数据,设置表结构并注册为表,然后进行数据过滤(可使用 SQL 或流处理方式)。
准备数据
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
代码实现
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
<p>object FlinkSqlTable {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)</p><pre class="brush:php;toolbar:false;"><pre class="brush:php;toolbar:false;">tableEnv.connect(new FileSystem().path("D:\d12\Flink\FlinkSql\src\main\resources\sensor.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
).createTemporaryTable("inputTable")
val resTable = tableEnv.from("inputTable")
.select("*").filter('id === "sensor_1")
var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'")
resTable.toAppendStream[(String, Long, Double)].print("resTable")
resSql.toAppendStream[(String, Long, Double)].print("resSql")
env.execute("FlinkSqlWrodCount")} }
06
6、TableEnvironment 的作用
TableEnvironment 用于注册 Catalog、在内部 Catalog 中注册表、执行 SQL 查询、注册用户自定义函数、保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。创建 TableEnv 时,可以通过 EnvironmentSettings 或 TableConfig 参数配置其特性。
07
7、老版本创建流处理和批处理
7.1
老版本流处理
val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings)
7.2
老版本批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create(batchEnv)
7.3
Blink 版本的流处理环境
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
7.4
Blink 版本的批处理环境
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)
00
总结:
本文介绍了 Flink SQL 的入门操作,后续将分享更多关于 Flink SQL 连接 Kafka、输出到 Kafka、MySQL 等内容。我们下期见~~~
以上就是十分钟入门Fink SQL的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号