概念和通用 API
— 焉知非鱼Concepts and Common API
概念和通用 API #
Table API 和 SQL 被集成在一个联合 API 中。这个 API 的核心概念是一个 Table,作为查询的输入和输出。本文档介绍了具有 Table API 和 SQL 查询的程序的常用结构,如何注册 Table,如何查询 Table,如何发出 Table。
两种 Planners 的主要区别 #
- Blink 将批处理作业视为流式作业的一种特殊情况。因此,也不支持 Table 和 DataSet 之间的转换,批处理作业不会被翻译成 DateSet 程序,而是翻译成 DataStream 程序,和流作业一样。
- Blink 计划器不支持 BatchTableSource,请使用有界的 StreamTableSource 代替。
- 旧计划器和 Blink 计划器的 FilterableTableSource 的实现是不兼容的。旧的规划者会将 PlannerExpressions 推送到 FilterableTableSource 中,而 Blink 规划者会将 Expressions 推送下去。
- 基于字符串的键值配置选项(详情请看配置文档)只用于 Blink 规划器。
- PlannerConfig 在两个规划器中的实现(CalciteConfig)是不同的。
- Blink 规划师将在 TableEnvironment 和 StreamTableEnvironment 上把多个汇优化成一个 DAG。旧的规划器总是会将每个汇优化成一个新的 DAG,其中所有的 DAG 是相互独立的。
- 现在老的计划器不支持目录统计,而 Blink 计划器支持。
Table API 和 SQL 程序的结构 #
所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例显示了 Table API 和 SQL 程序的共同结构。
// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section
// create a Table
tableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")
// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// emit a Table API result Table to a TableSink, same for SQL result
val tableResult = tapiResult.executeInsert("outputTable")
tableResult...
注意:表 API 和 SQL 查询可以很容易地与 DataStream 或 DataSet 程序集成并嵌入其中。请查看与 DataStream 和 DataSet API 的集成部分,了解如何将 DataStream 和 DataSets 转换为表,反之亦然。
创建一个 TableEnvironment #
TableEnvironment 是 Table API 和 SQL 集成的核心概念。它负责
- 在内部目录(catalog)中注册一个 Table
- 登记目录(catalog)
- 加载可插拔模块
- 执行 SQL 查询
- 注册一个用户定义的(标量、表或聚合)函数
- 将 DataStream 或 DataSet 转换为 Table
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。
一个 Table 总是绑定在一个特定的 TableEnvironment 上。在同一个查询中,不可能将不同 TableEnvironments 的表组合起来,例如,将它们连接或联合起来。
通过调用静态的 BatchTableEnvironment.create()
或 StreamTableEnvironment.create()
方法创建一个 TableEnvironment,其中包含一个 StreamExecutionEnvironment 或 ExecutionEnvironment 和一个可选的 TableConfig。TableConfig 可以用来配置 TableEnvironment 或自定义查询优化和翻译过程(参见 Query Optimization)。
确保选择与你的编程语言相匹配的特定规划器 BatchTableEnvironment/StreamTableEnvironment。
如果这两个规划器 jar 都在 classpath 上(默认行为),你应该明确设置在当前程序中使用哪个规划器。
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
注意:如果在 /lib
目录下只有一个 planner jar,可以使用 AnyPlanner(python 的 use_any_planner)
来创建特定的环境设置。
在目录(Catalog)中创建表 #
一个 TableEnvironment 维护着一个表的目录图,这些表是用一个标识符创建的。每个标识符由 3 部分组成:目录名、数据库名和对象名。如果没有指定目录或数据库,将使用当前的默认值(参见Table 标识符展开部分的例子)。
表可以是虚拟的(VIEWS)或常规的(TABLES)。VIEWS 可以从现有的 Table 对象创建,通常是 Table API 或 SQL 查询的结果。TABLES 描述外部数据,如文件、数据库表或消息队列。
临时表与永久表 #
表可以是临时的,与单个 Flink 会话的生命周期挂钩,也可以是永久的,在多个 Flink 会话和集群中可见。
永久表需要一个目录(如 Hive Metastore)来维护表的元数据。一旦创建了永久表,它对连接到目录的任何 Flink 会话都是可见的,并将继续存在,直到表被显式放弃。
另一方面,临时表总是存储在内存中,并且只在它们创建的 Flink 会话的持续时间内存在。这些表对其他会话不可见。它们不绑定到任何目录或数据库,但可以在一个目录或数据库的命名空间中创建。如果相应的数据库被删除,临时表不会被删除。
Shadowing #
可以用与现有永久表相同的标识符登记一个临时表。只要临时表存在,临时表就会对永久表产生遮盖,使永久表无法访问。所有使用该标识符的查询都将针对临时表执行。
这可能对实验很有用。它允许首先对临时表运行完全相同的查询,例如,只有一个数据子集,或者数据被混淆了。一旦验证了查询的正确性,就可以针对真正的生产表运行。
创建一个 Table #
虚拟表 #
表 API 对象对应于 SQL 术语中的 VIEW(虚拟表)。它封装了一个逻辑查询计划。它可以在一个目录中创建,具体如下。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
注意:Table 对象与关系型数据库系统中的 VIEW 类似,即定义 Table 的查询不进行优化,但当另一个查询引用注册的 Table 时,会被内联。如果多个查询引用同一个注册表,则会对每个引用查询进行内联,并执行多次,即注册表的结果不会被共享。
连接器表 #
也可以从连接器声明中创建一个关系型数据库中已知的 TABLE。连接器描述的是存储表数据的外部系统。这里可以声明 Apacha Kafka 或普通文件系统等存储系统。
DDL
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
扩展 Table 标识符 #
表总是用目录(catalog)、数据库、表名三部分组成的标识符进行注册。
用户可以将其中的一个目录和一个数据库设置为"当前目录"和"当前数据库"。其中,上述 3 部分标识符中的前两部分可以选择,如果不提供,则引用当前目录和当前数据库。用户可以通过表 API 或 SQL 切换当前目录和当前数据库。
标识符遵循 SQL 的要求,这意味着它们可以用反引号符(`)进行转义。
// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")
val table: Table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
查询一个 Table #
Table API #
Table API 是 Scala 和 Java 的语言集成查询 API。与 SQL 不同的是,查询不是指定为 Strings,而是在宿主语言中一步步组成。
该 API 基于 Table 类,它表示一个表(流式或批处理),并提供了应用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入的 Table 应用关系操作的结果。有些关系操作由多个方法调用组成,如 table.groupBy(...).select()
,其中 groupBy(...)
指定表的分组,select(...)
是表的分组上的投影。
Table API 文档描述了流式表和批处理表上支持的所有 Table API 操作。
下面的示例显示了一个简单的 Table API 聚合查询。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders
.filter($"cCountry" === "FRANCE")
.groupBy($"cID", $"cName")
.select($"cID", $"cName", $"revenue".sum AS "revSum")
// emit or convert Table
// execute query
注意:Scala Table API 使用以美元符号($
)开头的 Scala 字符串插值来引用 Table 的属性。Table API 使用 Scala implicits。请确保导入
org.apache.flink.table.api._
- 用于隐式表达式转换org.apache.flink.api.scala._
和org.apache.flink.table.api.bridge.scala._
,如果你想从 DataStream 转换到 DataStream。
SQL #
Flink 的 SQL 集成是基于 Apache Calcite,它实现了 SQL 标准。SQL 查询被指定为常规 Strings。
SQL 文档描述了 Flink 对流和批处理表的 SQL 支持。
下面的例子展示了如何指定一个查询并将结果以表的形式返回。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
下面的示例显示了如何指定一个更新查询,将其结果插入到注册表中。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
混合 Table API 和 SQL #
表 API 和 SQL 查询可以很容易地混合,因为两者都返回 Table 对象。
- 可以在 SQL 查询返回的 Table 对象上定义 Table API 查询。
- 通过在 TableEnvironment 中注册生成的 Table并在 SQL 查询的 FROM 子句中引用它,可以在 Table API 查询的结果上定义一个 SQL 查询。
发出一个表 #
一个 Table 是通过将其写入 TableSink 而发出的。TableSink 是一个通用接口,它支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息系统(如 Apache Kafka、RabbitMQ)。
批量表只能写入 BatchTableSink,而流式表则需要 AppendStreamTableSink、RetractStreamTableSink 或 UpsertStreamTableSink。
请参阅有关 Table Sources & Sink 的文档,以了解可用的 Sink 的详细信息以及如何实现自定义 TableSink 的说明。
Table.executeInsert(String tableName)
方法将 Table 排放到一个注册的 TableSink 中。该方法通过名称从目录中查找 TableSink,并验证 Table 的模式与 TableSink 的模式是否相同。
下面的示例展示了如何发射 Table。
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create an output Table
val schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG())
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable")
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable")
翻译和执行查询 #
两个规划器翻译和执行查询的行为是不同的。
- Blink 计划器
表 API 和 SQL 查询无论其输入是流式还是批处理,都会被翻译成 DataStream 程序。一个查询在内部表示为一个逻辑查询计划,并分两个阶段进行翻译。
- 逻辑计划的优化。
- 翻译成 DataStream 程序。
Table API 或 SQL 查询被翻译时:
TableEnvironment.executeSql()
被调用。这个方法用于执行给定的语句,一旦这个方法被调用,sql 查询就会立即被翻译。Table.executeInsert()
被调用。该方法用于将表的内容插入到给定的 sink 路径中,一旦调用该方法,Table API 立即被翻译。- 调用
Table.execute()
。该方法用于将表内容收集到本地客户端,一旦调用该方法,Table API 立即被翻译。 StatementSet.execute()
被调用。一个 Table(通过StatementSet.addInsert()
向 sink 发出)或一个 INSERT 语句(通过StatementSet.addInsertSql()
指定)将首先在 StatementSet 中被缓冲。一旦StatementSet.execute()
被调用,它们就会被翻译。所有接收器将被优化成一个 DAG。- 当一个表被转换为 DataStream 时,它就会被翻译(参见与 DataStream 和 DataSet API 的集成)。一旦翻译完毕,它就是一个常规的 DataStream 程序,并在调用 StreamExecutionEnvironment.execut()时被执行。
注意: 从 1.11 版本开始,
sqlUpdate()
方法和insertInto()
方法已被废弃。如果 Table 程序是由这两个方法构建的,我们必须使用StreamTableEnvironment.execution()
方法代替StreamExecutionEnvironment.execution()
方法来执行。
与 DataStream 和 DataSet API 的集成 #
两种流上的计划器都可以与 DataStream API 集成,只有老的计划器可以与 DataSet API 集成,批处理的 Blink 计划器不能与两者结合。只有旧的计划器可以与 DataSet API 集成,批处理的 Blink 计划器不能与两者结合。注:下面讨论的 DataSet API 只适用于批处理的旧版规划器。
Table API 和 SQL 查询可以很容易地与 DataStream 和 DataSet 程序集成并嵌入其中。例如,可以查询一个外部表(例如来自 RDBMS),做一些预处理,如过滤、投影、聚合或加入元数据,然后用 DataStream 或 DataSet API(以及建立在这些 API 之上的任何库,如 CEP 或 Gelly)进一步处理数据。反之,也可以在 DataStream 或 DataSet 程序的结果上应用 Table API 或 SQL 查询。
这种交互可以通过将 DataStream 或 DataSet 转换为表来实现,反之亦然。在本节中,我们将描述这些转换是如何完成的。
Scala 隐式转换 #
Scala Table API 为 DataSet、DataStream 和 Table 类提供了隐式转换的功能。这些转换是通过导入包 org.apache.flink.table.api.bridge.scala._
来实现的,此外还可以导入 org.apache.flink.api.scala._
来实现 Scala DataStream API。
从 DataStream 或 DataSet 创建视图 #
DataStream 或 DataSet 可以作为视图在 TableEnvironment 中注册。由此产生的视图的模式取决于注册的 DataStream 或 DataSet 的数据类型。请查看有关数据类型到表模式的映射部分以了解详情。
注意:从 DataStream 或 DataSet 创建的视图只能注册为临时视图。
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)
将 DataStream 或 DataSet 转换为 Table #
不需要在 TableEnvironment 中注册一个 DataStream 或 DataSet,也可以直接将其转换为 Table。如果你想在 Table API 查询中使用 Table,这很方便。
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields "_1", "_2"
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields "myLong", "myString"
val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
将 Table 转换为 DataStream 或 DataSet #
Table 可以被转换为 DataStream 或 DataSet。通过这种方式,可以在表 API 或 SQL 查询的结果上运行自定义 DataStream 或 DataSet 程序。
当将 Table 转换为 DataStream 或 DataSet 时,您需要指定生成的 DataStream 或 DataSet 的数据类型,即表的行要转换为的数据类型。通常,最方便的转换类型是 Row。下面的列表给出了不同选项的功能概述。
- Row:字段按位置映射,字段数量任意,支持 null 值,无类型安全访问。
- POJO:字段按名称映射(POJO 字段必须与表字段一样命名),任意数量的字段,支持 null 值,类型安全访问。
- Case Class:字段按位置映射,不支持 null 值,类型安全访问。
- Tuple:字段按位置映射,限制为 22 个(Scala)或 25 个(Java)字段,不支持 null 值,类型安全访问。
- 原子类型:表必须有一个字段,不支持空值,类型安全访问。表必须有一个字段,不支持 null 值,类型安全访问。
将 Table 转换为 DataStream #
作为流式查询结果的表将被动态更新,即随着查询输入流中新记录的到达而变化。因此,将这种动态查询转换成的 DataStream 需要对表的更新进行编码。
有两种模式可以将表转换为 DataStream。
- Append 模式。只有当动态 Table 只被 INSERT 修改时,才可以使用这种模式,即只进行追加,之前发出的结果永远不会更新。
- 收回模式。这种模式可以一直使用。它将 INSERT 和 DELETE 更改用布尔标志编码。
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:关于动态表及其属性的详细讨论在动态表文档中给出。
注意: 一旦表转换为 DataStream,请使用 StreamExecutionEnvironment.execute()
方法来执行 DataStream 程序。
将 Table 转换为 DataSet #
Table 转换为 DataStream 的过程如下:
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
注意: 一旦 Table 转换为 DataSet,我们必须使用 ExecutionEnvironment.execute
方法来执行 DataSet 程序。
数据类型到 Table Schema 的映射 #
Flink 的 DataStream 和 DataSet API 支持非常多样化的类型。复合类型,如 Tuples(内置的 Scala 和 Flink Java tuples)、POJOs、Scala case 类和 Flink 的 Row 类型,允许嵌套具有多个字段的数据结构,这些字段可以在 Table 表达式中访问。其他类型被视为原子类型。在下文中,我们将描述 Table API 如何将这些类型转换为内部行表示,并展示将 DataStream 转换为 Table 的例子。
数据类型到 Table Schema 的映射可以通过两种方式进行:基于字段位置或基于字段名。
- 基于位置的映射
基于位置的映射可以用来给字段一个更有意义的名字,同时保持字段顺序。这种映射可用于具有定义字段顺序的复合数据类型以及原子类型。复合数据类型如元组、行和 case 类都有这样的字段顺序。然而,POJO 的字段必须根据字段名进行映射(见下一节)。字段可以被投影出来,但不能使用别名作为重命名。
当定义基于位置的映射时,指定的名称必须不存在于输入数据类型中,否则 API 将假设映射应该基于字段名发生。如果没有指定字段名,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用 f0。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, $"myLong")
// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")
- 基于名称的映射
基于名称的映射可以用于任何数据类型,包括 POJO。它是定义表模式映射的最灵活的方式。映射中的所有字段都是通过名称引用的,并可能使用别名重命名为。字段可以重新排序和投影出来。
如果没有指定字段名,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用 f0。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, $"_2")
// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")
原子类型 #
Flink 将原语(Integer、Double、String)或通用类型(不能分析和分解的类型)视为原子类型。原子类型的 DataStream 或 DataSet 会被转换为具有单一属性的 Table。属性的类型是从原子类型推断出来的,可以指定属性的名称。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[Long] = ...
// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, $"myLong")
Tuples(Scala 和 Java)和 Case 类(仅 Scala)。 #
Flink 支持 Scala 的内置元组,并为 Java 提供了自己的元组类。DataStreams 和 DataSets 这两种元组都可以转换为表。通过为所有字段提供名称(基于位置的映射),可以重命名字段。如果没有指定字段名,则使用默认的字段名。如果引用了原始的字段名(对于 Flink Tuples 来说是 f0, f1, …,对于 Scala Tuples 来说是 _1, _2, …),API 会假定映射是基于名称而不是基于位置的。基于名称的映射允许重新排序字段和用别名(as)进行投影。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2")
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
POJO(Java 和 Scala) #
Flink 支持 POJO 作为复合类型。这里记录了确定 POJO 的规则。
当将 POJO DataStream 或 DataSet 转换为 Table 而不指定字段名时,会使用原始 POJO 字段的名称。名称映射需要原始名称,不能通过位置来完成。字段可以使用别名(使用 as 关键字)重命名,重新排序,并进行投影。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
Row #
Row 数据类型支持任意数量的字段和具有 null 值的字段。字段名可以通过 RowTypeInfo 来指定,也可以在将 Row DataStream 或 DataSet 转换为 Table 时指定。Row 类型支持通过位置和名称对字段进行映射。可以通过为所有字段提供名称(基于位置的映射)或单独选择字段进行投影/排序/重命名(基于名称的映射)来重命名字段。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, $"myName", $"myAge")
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName", $"age" as "myAge")
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name")
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")
查询优化 #
- Blink 计划器
Apache Flink 利用并扩展了 Apache Calcite 来执行复杂的查询优化。这包括一系列基于规则和成本的优化,如:
- 基于 Apache Calcite 的子查询装饰相关。
- 投影修剪
- 分区修剪
- 过滤器下推
- 子计划重复复制,避免重复计算。
- 特殊子查询重写,包括两部分。
- 将 IN 和 EXISTS 转换为左半连接。
- 将 NOT IN 和 NOT EXISTS 转换为左反连接。
- 可选的 join 重新排序
- 通过
table.optimizer.join-reorder-enabled
启用。
- 通过
注:IN/EXISTS/NOT IN/NOT EXISTS
目前只支持子查询重写中的连词条件。
优化器做出智能决策,不仅基于计划,还基于数据源提供的丰富统计数据,以及每个操作符(如 io、cpu、网络和内存)的细粒度成本。
高级用户可以通过 CalciteConfig 对象提供自定义优化,该对象可以通过调用 TableEnvironment#getConfig#setPlannerConfig
提供给 table 环境。
解释表 #
Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。这是通过 Table.explain()
方法或 StatementSet.explain()
方法完成的。Table.explain()
返回一个 Table 的计划。StatementSet.explain()
返回多个接收器的计划。它返回一个描述三个计划的字符串。
- 关系查询的抽象语法树,即未优化的逻辑查询计划。
- 优化的逻辑查询计划,以及
- 物理执行计划。
TableEnvironment.explainSql()
和 TableEnvironment.executeSql()
支持执行 EXPLAIN 语句来获取计划,请参考 EXPLAIN 页面。
下面的代码显示了一个使用 Table.explain()
方法给定 Table 的例子和相应的输出。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
.where($"word".like("F%"))
.unionAll(table2)
println(table.explain())
上述例子的结果是:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
下面的代码显示了使用 StatementSet.explain()
方法进行多重接收器计划的一个例子和相应的输出。
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)
val schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING())
tEnv.connect(new FileSystem("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1")
tEnv.connect(new FileSystem("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2")
tEnv.connect(new FileSystem("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1")
tEnv.connect(new FileSystem("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2")
val stmtSet = tEnv.createStatementSet()
val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.addInsert("MySink1", table1)
val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.addInsert("MySink2", table2)
val explanation = stmtSet.explain()
println(explanation)
多重接收器计划的结果是:
== Abstract Syntax Tree ==
LogicalLegacySink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalLegacySink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])
LegacySink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 3 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
ship_strategy : FORWARD
Stage 5 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Stage 9 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 10 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 12 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
Stage 14 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html