十分钟入门Fink SQL

雪夜
发布: 2025-08-30 09:01:15
原创
748人浏览过

十分钟入门fink sql快速掌握flink sql基础

十分钟入门Fink SQL00

前言

Flink 作为一个统一的批流处理框架,其 Table API 和 SQL 是高层次的处理 API。尽管当前功能仍在积极开发中,但已经可以支持批流统一处理。Table API 允许在 Java 和 Scala 中使用直观的查询 API,结合关系运算符如 select、filter 和 join 进行查询。而 Flink SQL 则允许直接在代码中编写 SQL 实现查询操作,基于 Apache Calcite 实现 SQL 标准支持。

十分钟入门Fink SQL01

1、导入必要的依赖包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.1</version>
</dependency>
登录后复制

flink-table-planner 是 Table API 的核心部分,提供运行时环境和执行计划生成;flink-table-api-scala-bridge 则负责 Table API 与 DataStream/DataSet API 之间的连接支持。这些依赖在 IDE 开发环境中需要添加,而在生产环境中,lib 目录通常已包含 planner,只需添加 bridge 即可。如果需要使用自定义函数或连接 Kafka,还需要 flink-table-common 中的 SQL client。

02

2、两种 planner(旧版与 Blink)的区别

Blink 将批处理视为流处理的特殊情况,不支持表与 DataSet 之间的转换,批处理作业直接转换为 DataStream 程序处理。Blink planner 不支持 BatchTableSource,使用有界的 Blink planner 只支持新目录,不支持旧的 ExternalCatalog。旧版 planner 和 Blink planner 在 FilterableTableSource 的实现上不兼容,旧版会将 PlannerExpressions 下推到 filterableTableSource,而 Blink planner 则下推 Expressions。基于字符串的配置选项仅适用于 Blink planner,PlannerConfig 在两种 planner 中实现不同。Blink planner 支持在单个 DAG 中优化多个 sink(仅在 TableEnvironment 中支持),而旧版 planner 则为每个 sink 创建独立的 DAG,不支持目录统计,而 Blink planner 支持。

03

3、表(Table)的概念

TableEnvironment 可以注册 Catalog,并基于 Catalog 注册表,维护 Catalog-Table 映射。表由标识符指定,包含 Catalog 名、数据库名和对象名(表名)。如果未指定目录或数据库,使用当前默认值。

04

4、连接文件系统(Csv 格式)

通过

tableEnv.connect()
登录后复制
调用 ConnectorDescriptor 来连接外部系统。对于文件系统,使用内置的 FileSystem() connector。

05

5、测试案例(新)

需求:从 txt 文件读取数据,过滤掉 id 不为 sensor_1 的数据。

实现思路:首先创建 table 环境,通过 connect 方法读取数据,设置表结构并注册为表,然后进行数据过滤(可使用 SQL 或流处理方式)。

小门道AI
小门道AI

小门道AI是一个提供AI服务的网站

小门道AI117
查看详情 小门道AI

准备数据

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
登录后复制

代码实现

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
<p>object FlinkSqlTable {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)</p><pre class="brush:php;toolbar:false;"><pre class="brush:php;toolbar:false;">tableEnv.connect(new FileSystem().path("D:\d12\Flink\FlinkSql\src\main\resources\sensor.txt"))
  .withFormat(new Csv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("time", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
  ).createTemporaryTable("inputTable")

val resTable = tableEnv.from("inputTable")
  .select("*").filter('id === "sensor_1")

var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'")

resTable.toAppendStream[(String, Long, Double)].print("resTable")
resSql.toAppendStream[(String, Long, Double)].print("resSql")

env.execute("FlinkSqlWrodCount")
登录后复制

} }

06

6、TableEnvironment 的作用

TableEnvironment 用于注册 Catalog、在内部 Catalog 中注册表、执行 SQL 查询、注册用户自定义函数、保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。创建 TableEnv 时,可以通过 EnvironmentSettings 或 TableConfig 参数配置其特性。

07

7、老版本创建流处理和批处理

7.1

老版本流处理

val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
登录后复制

7.2

老版本批处理

val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)
登录后复制

7.3

Blink 版本的流处理环境

val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
登录后复制

7.4

Blink 版本的批处理环境

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
登录后复制

00

总结:

本文介绍了 Flink SQL 的入门操作,后续将分享更多关于 Flink SQL 连接 Kafka、输出到 Kafka、MySQL 等内容。我们下期见~~~

以上就是十分钟入门Fink SQL的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号