Table API
— 焉知非鱼Table API
Table API
表 API 是一个统一的关系型 API,用于流处理和批处理。Table API 查询可以在批处理或流处理输入上运行,无需修改。Table API 是 SQL 语言的超级集,是专门为 Apache Flink 工作而设计的。Table API 是 Scala、Java 和 Python 的语言集成 API。Table API 查询不是像 SQL 那样以字符串值的方式指定查询,而是以语言嵌入的方式在 Java、Scala 或 Python 中定义查询,并支持自动完成和语法验证等 IDE。
Table API 与 Flink 的 SQL 集成共享许多概念和部分 API。请看通用概念和 API,了解如何注册表或创建表对象。流概念页面讨论了流的具体概念,如动态表和时间属性。
下面的例子假设一个名为 Orders 的注册表具有属性(a, b, c, rowtime)。rowtime 字段在流式中是一个逻辑时间属性,在批处理中是一个常规的时间戳字段。
概述和示例 #
Table API 可用于 Scala、Java 和 Python。Scala Table API 利用的是 Scala 表达式,Java Table API 既支持 Expression DSL,也支持解析并转换为等价表达式的字符串,Python Table API 目前只支持解析并转换为等价表达式的字符串。
下面的例子显示了 Scala、Java 和 Python Table API 之间的差异。表程序是在批处理环境中执行的。它扫描 Orders 表,按字段 a 分组,并计算每组的结果行。
通过导入 org.apache.flink.table.api._
、org.apache.flink.api.scala._
和 org.apache.flink.table.api.bridge.scala._
(用于桥接到/来自 DataStream)来启用 Scala Table API。
下面的例子展示了 Scala Table API 程序是如何构造的。表字段使用 Scala 的字符串插值,使用美元字符($
)引用。
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
// register Orders table in table environment
// ...
// specify table program
val orders = tEnv.from("Orders") // schema (a, b, c, rowtime)
val result = orders
.groupBy($"a")
.select($"a", $"b".count as "cnt")
.toDataSet[Row] // conversion to DataSet
.print()
下一个例子显示了一个更复杂的 Table API 程序。该程序再次扫描 Orders 表,过滤空值,对类型为 String 的字段 a 进行归一化处理,并为每个小时和产品 a 计算平均计费金额 b。它过滤空值,对类型为 String 的字段 a 进行标准化,并为每个小时和产品 a 计算平均账单金额 b。
// environment configuration
// ...
// specify table program
val orders: Table = tEnv.from("Orders") // schema (a, b, c, rowtime)
val result: Table = orders
.filter($"a".isNotNull && $"b".isNotNull && $"c".isNotNull)
.select($"a".lowerCase() as "a", $"b", $"rowtime")
.window(Tumble over 1.hour on $"rowtime" as "hourlyWindow")
.groupBy($"hourlyWindow", $"a")
.select($"a", $"hourlyWindow".end as "hour", $"b".avg as "avgBillingAmount")
由于表 API 是针对批处理和流数据的统一 API,所以这两个示例程序都可以在批处理和流输入上执行,而不需要对表程序本身进行任何修改。在这两种情况下,考虑到流式记录不会迟到,程序会产生相同的结果(详见流概念)。
操作 #
表 API 支持以下操作。请注意,并不是所有的操作都能在批处理和流式处理中使用,它们都有相应的标签。
Scan, Projection 和 Filter #
- From(Batch/Streaming)
类似于 SQL 查询中的 FROM 子句。执行对注册的表的扫描。
val orders: Table = tableEnv.from("Orders")
- Values(Batch/Streaming)
类似于 SQL 查询中的 VALUES 子句。从提供的行中产生一个内联表。
你可以使用 row(...)
表达式来创建复合行。
val table = tEnv.fromValues(
row(1, "ABC"),
row(2L, "ABCDE")
)
将产生一个模式(schema)如下的表。
root
|-- f0: BIGINT NOT NULL // original types INT and BIGINT are generalized to BIGINT
|-- f1: VARCHAR(5) NOT NULL // original types CHAR(3) and CHAR(5) are generalized
// to VARCHAR(5). VARCHAR is used instead of CHAR so that
// no padding is applied
该方法将从输入的表达式中自动得出类型,如果某个位置的类型不同,该方法将尝试为所有类型找到共同的超级类型。如果某个位置的类型不同,方法将尝试为所有类型找到一个共同的超级类型。如果一个共同的超级类型不存在,将抛出一个异常。
您也可以明确地指定请求的类型。这可能对分配更多的通用类型(如 DECIMAL)或为列命名很有帮助。
val table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "ABC"),
row(2L, "ABCDE")
)
将产生一个模式(schema)如下的表。
root
|-- id: DECIMAL(10, 2)
|-- name: STRING
- Select(Batch/Streaming)
类似于 SQL SELECT 语句。执行选择操作。
val orders: Table = tableEnv.from("Orders")
val result = orders.select($"a", $"c" as "d")
你可以使用星号(*
)作为通配符,选择表中所有的列。
val orders: Table = tableEnv.from("Orders")
val result = orders.select($"*")
- As(Batch/Streaming)
重新命名字段。
val orders: Table = tableEnv.from("Orders").as("x", "y", "z", "t")
- Where / Filter(Batch/Streaming)
类似于 SQL WHERE 子句。过滤掉没有通过过滤谓词的记录。
val orders: Table = tableEnv.from("Orders")
val result = orders.filter($"a" % 2 === 0)
或:
val orders: Table = tableEnv.from("Orders")
val result = orders.where($"b" === "red")
列操作 #
- AddColumns(Batch/Streaming)
执行字段添加操作。如果添加的字段已经存在,它将抛出一个异常。
val orders = tableEnv.from("Orders");
val result = orders.addColumns(concat($"c", "Sunny"))
- AddOrReplaceColumns(Batch/Streaming)
执行字段添加操作。如果添加的列名与现有的列名相同,那么现有的字段将被替换。此外,如果添加的字段名与现有字段名重复,则使用最后一个字段名。
val orders = tableEnv.from("Orders");
val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc")
- DropColumns(Batch/Streaming)
执行字段删除操作。字段表达式应该是字段引用表达式,并且只能删除现有的字段。
val orders = tableEnv.from("Orders");
val result = orders.dropColumns($"b", $"c")
- RenameColumns(Batch/Streaming)
执行字段重命名操作。字段表达式应为别名表达式,且只能对现有字段进行重命名。
val orders = tableEnv.from("Orders");
val result = orders.renameColumns($"b" as "b2", $"c" as "c2")
聚合(Aggregations) #
- GroupBy 聚合(Batch/Streaming/Result Updating)
类似于 SQL 的 GROUP BY 子句。将分组键上的行与下面的运行聚合操作符进行分组,以分组方式聚合行。
val orders: Table = tableEnv.from("Orders")
val result = orders.groupBy($"a").select($"a", $"b".sum().as("d"))
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,这取决于聚合的类型和不同分组键的数量。请提供有效的保留时间间隔的查询配置,以防止状态大小过大。详见查询配置。
- GroupBy 窗口聚合(Batch/Streaming)
在分组窗口和可能的一个或多个分组键上对一个表进行分组和聚合。
val orders: Table = tableEnv.from("Orders")
val result: Table = orders
.window(Tumble over 5.minutes on $"rowtime" as "w") // define window
.groupBy($"a", $"w") // group by key and window
.select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".sum as "d") // access window properties and aggregate
- Over 窗口聚合(Streaming)
类似于 SQL OVER 子句。根据前后记录的窗口(范围),为每条记录计算 OVER 窗口汇总。更多细节请参见 over 窗口部分。
val orders: Table = tableEnv.from("Orders")
val result: Table = orders
// define window
.window(
Over
partitionBy $"a"
orderBy $"rowtime"
preceding UNBOUNDED_RANGE
following CURRENT_RANGE
as "w")
.select($"a", $"b".avg over $"w", $"b".max().over($"w"), $"b".min().over($"w")) // sliding aggregate
注意:所有的聚合必须定义在同一个窗口上,即相同的分区、排序和范围。目前,只支持对 CURRENT ROW 范围的 PRECEDING(UNBOUNDED 和 bounded)窗口。还不支持带 FOLLOWING 的范围。ORDER BY 必须在单个时间属性上指定。
- Distinct 聚合(Batch Streaming/Result Updating)
类似于 SQL 的 DISTINCT AGGREGATION 子句,如 COUNT(DISTINCT a)
。Distinct 聚合声明一个聚合函数(内置的或用户定义的)只应用在不同的输入值上,Distinct 可以应用于 GroupBy 聚合,GroupBy 窗口聚合和 Over 窗口聚合。Distinct 可以应用于 GroupBy 聚合、GroupBy 窗口聚合和 Over 窗口聚合。
val orders: Table = tableEnv.from("Orders");
// Distinct aggregation on group by
val groupByDistinctResult = orders
.groupBy($"a")
.select($"a", $"b".sum.distinct as "d")
// Distinct aggregation on time window group by
val groupByWindowDistinctResult = orders
.window(Tumble over 5.minutes on $"rowtime" as "w").groupBy($"a", $"w")
.select($"a", $"b".sum.distinct as "d")
// Distinct aggregation on over window
val result = orders
.window(Over
partitionBy $"a"
orderBy $"rowtime"
preceding UNBOUNDED_RANGE
as $"w")
.select($"a", $"b".avg.distinct over $"w", $"b".max over $"w", $"b".min over $"w")
用户自定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果只计算不同值的聚合结果,只需在聚合函数中添加 distinct 修饰符即可。
val orders: Table = tEnv.from("Orders");
// Use distinct aggregation for user-defined aggregate functions
val myUdagg = new MyUdagg();
orders.groupBy($"users").select($"users", myUdagg.distinct($"points") as "myDistinctResult");
注意:对于流式查询,计算查询结果所需的状态可能会根据不同字段的数量而无限增长。请提供一个有效的保留时间间隔的查询配置,以防止过大的状态大小。详情请看查询配置。
- Distinct(Batch Streaming/Result Updating)
类似于 SQL 的 DISTINCT 子句。返回具有不同值组合的记录。
val orders: Table = tableEnv.from("Orders")
val result = orders.distinct()
注意:对于流式查询,计算查询结果所需的状态可能会根据不同字段的数量而无限增长。请提供一个有效的保留时间间隔的查询配置,以防止过大的状态大小。如果启用了状态清洗功能,Distinct 必须发出消息,以防止下游操作者过早地驱逐状态,从而使 Distinct 包含结果更新。详见查询配置。
Joins #
- Inner Join(Batch/Streaming)
类似于 SQL JOIN 子句。连接两个表。两个表必须有不同的字段名,并且必须通过 join 操作符或使用 where 或 filter 操作符定义至少一个平等连接谓词。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"d", $"e", $"f")
val result = left.join(right).where($"a" === $"d").select($"a", $"b", $"e")
注意:对于流式查询,计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供一个具有有效保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置。
- Outer Join(Batch/Streaming/Result Updating)
类似于 SQL LEFT/RIGHT/FULL OUTER JOIN 子句。连接两个表。两个表必须有不同的字段名,并且必须定义至少一个平等连接谓词。
val left = tableEnv.fromDataSet(ds1, $"a", $"b", $"c")
val right = tableEnv.fromDataSet(ds2, $"d", $"e", $"f")
val leftOuterResult = left.leftOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val rightOuterResult = left.rightOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
val fullOuterResult = left.fullOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e")
注意:对于流式查询,计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供一个具有有效保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置。
- Interval Join(Batch/Streaming)
注:区间连接是常规连接的一个子集,可以用流式处理。
一个区间连接至少需要一个等价连接谓词和一个连接条件,以限制双方的时间。这样的条件可以由两个合适的范围谓词(<,<=,>=,>)或一个比较两个输入表的相同类型的时间属性(即处理时间或事件时间)的单一平等谓词来定义。
例如,以下谓词是有效的区间连接条件。
$"ltime" === $"rtime"
$"ltime" >= $"rtime" && $"ltime" < $"rtime" + 10.minutes
val left = ds1.toTable(tableEnv, $"a", $"b", $"c", $"ltime".rowtime)
val right = ds2.toTable(tableEnv, $"d", $"e", $"f", $"rtime".rowtime)
val result = left.join(right)
.where($"a" === $"d" && $"ltime" >= $"rtime" - 5.minutes && $"ltime" < $"rtime" + 10.minutes)
.select($"a", $"b", $"e", $"ltime")
- Inner Join with Table Function (UDTF)(Batch/Streaming)
用表格函数的结果连接一个表格。左表(外表)的每条记录都与相应的表函数调用所产生的所有记录合并。如果左(外)表的表函数调用返回的结果是空的,则放弃该表的某行。
// instantiate User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()
// join
val result: Table = table
.joinLateral(split($"c") as ("s", "t", "v"))
.select($"a", $"b", $"s", $"t", $"v")
- Left Outer Join with Table Function (UDTF)(Batch/Streaming)
用表格函数的结果连接一个表格。左表(外表)的每一行都与相应的表函数调用所产生的所有行相连接。如果表函数调用返回的结果为空,则保留相应的外侧行,并将结果用空值填充。
注意:目前,表函数左外侧连接的谓词只能是空或字面为真。
// instantiate User-Defined Table Function
val split: TableFunction[_] = new MySplitUDTF()
// join
val result: Table = table
.leftOuterJoinLateral(split($"c") as ("s", "t", "v"))
.select($"a", $"b", $"s", $"t", $"v")
- Join with Temporal Table(Streaming)
时间表是跟踪其随时间变化的表。
时间表函数提供了对时间表在特定时间点的状态的访问。用时态表函数连接表的语法与带表函数的内部连接中的语法相同。
目前只支持与时态表的内联接。
val ratesHistory = tableEnv.from("RatesHistory")
// register temporal table function with a time attribute and primary key
val rates = ratesHistory.createTemporalTableFunction($"r_proctime", $"r_currency")
// join with "Orders" based on the time attribute and key
val orders = tableEnv.from("Orders")
val result = orders
.joinLateral(rates($"o_rowtime"), $"r_currency" === $"o_currency")
更多信息请查看更详细的时间表概念说明。
集合操作 #
- Union(Batch)
类似于 SQL UNION 子句。将两个表联合起来,去除重复记录,两个表必须有相同的字段类型。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.union(right)
- UnionAll(Batch/Streaming)
类似于 SQL UNION ALL 子句。联合两个表,两个表的字段类型必须相同。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.unionAll(right)
- Intersect(Batch)
类似于 SQL INTERSECT 子句。Intersect 子句返回的是两个表中都存在的记录。如果一条记录在一个表或两个表中存在一次以上,则只返回一次,即结果表没有重复记录。两个表的字段类型必须相同。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"e", $"f", $"g")
val result = left.intersect(right)
- IntersectAll(Batch)
类似于 SQL 的 INTERSECT ALL 子句。IntersectAll 子句返回两个表中都存在的记录。如果一条记录在两张表中都存在一次以上,那么就会按照它在两张表中存在的次数来返回,也就是说,得到的表可能有重复的记录。两个表的字段类型必须相同。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"e", $"f", $"g")
val result = left.intersectAll(right)
- Minus(Batch)
类似于 SQL EXCEPT 子句。Minus 返回左表中不存在于右表中的记录。左表中的重复记录只返回一次,即删除重复记录。两个表的字段类型必须相同。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.minus(right)
- MinusAll(Batch)
类似于 SQL EXCEPT ALL 子句。MinusAll 子句返回右表中不存在的记录。一条记录在左表中出现 n 次,在右表中出现 m 次,则返回(n - m)次,即删除右表中存在的重复记录。两个表的字段类型必须相同。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a", $"b", $"c")
val result = left.minusAll(right)
- In(Batch/Streaming)
类似于 SQL 的 IN 子句。如果一个表达式存在于给定的表子查询中,In 子句返回 true。子查询表必须由一列组成。该列必须与表达式具有相同的数据类型。
val left = ds1.toTable(tableEnv, $"a", $"b", $"c")
val right = ds2.toTable(tableEnv, $"a")
val result = left.select($"a", $"b", $"c").where($"a".in(right))
注意:对于流式查询,该操作被重写为加入和分组操作。计算查询结果所需的状态可能会根据不同输入行的数量而无限增长。请提供有效的保留时间间隔的查询配置,以防止状态大小过大。详情请看查询配置。
OrderBy, Offset 和 Fetch #
- Order By(Batch)
类似于 SQL ORDER BY 子句。返回所有平行分区的全局排序记录。
val in = ds.toTable(tableEnv, $"a", $"b", $"c")
val result = in.orderBy($"a".asc)
- Offset 和 Fetch(Batch)
类似于 SQL 的 OFFSET 和 FETCH 子句。Offset 和 Fetch 限制了排序结果中返回的记录数量。Offset 和 Fetch 在技术上是 Order By 操作符的一部分,因此必须在它前面。
val in = ds.toTable(tableEnv, $"a", $"b", $"c")
// returns the first 5 records from the sorted result
val result1: Table = in.orderBy($"a".asc).fetch(5)
// skips the first 3 records and returns all following records from the sorted result
val result2: Table = in.orderBy($"a".asc).offset(3)
// skips the first 10 records and returns the next 5 records from the sorted result
val result3: Table = in.orderBy($"a".asc).offset(10).fetch(5)
Insert #
- Insert Into(Batch/Streaming)
类似于 SQL 查询中的 INSERT INTO
子句,该方法执行插入到一个注册的输出表中。executeInsert()
方法将立即提交一个执行插入操作的 Flink 作业。
输出表必须在 TableEnvironment 中注册(见连接器表)。此外,注册表的模式必须与查询的模式相匹配。
val orders: Table = tableEnv.from("Orders")
orders.executeInsert("OutOrders")
Group 窗口 #
分组窗口根据时间或行数间隔将分组行汇总成有限的组,每组评估一次汇总函数。对于批处理表来说,窗口是按时间间隔对记录进行分组的便捷捷径。
窗口是使用 window(w: GroupWindow)子句定义的,并且需要一个别名,这个别名是使用 as 子句指定的。为了通过窗口对表进行分组,必须在 groupBy(…)子句中像常规分组属性一样引用窗口别名。下面的例子展示了如何在表上定义窗口聚合。
val table = input
.window([w: GroupWindow] as $"w") // define window with alias w
.groupBy($"w") // group the table by window w
.select($"b".sum) // aggregate
在流式环境中,只有当窗口聚合除窗口外还对一个或多个属性进行分组时,才能并行计算,即 groupBy(…)子句引用了一个窗口别名和至少一个附加属性。仅引用窗口别名的 groupBy(…) 子句(如上面的例子)只能由单个非并行任务来评估。下面的示例显示了如何定义具有附加分组属性的窗口聚合。
val table = input
.window([w: GroupWindow] as $"w") // define window with alias w
.groupBy($"w", $"a") // group the table by attribute a and window w
.select($"a", $"b".sum) // aggregate
窗口属性,如时间窗口的开始、结束或行时间戳,可以在选择语句中作为窗口别名的属性,分别添加为 w.start、w.end 和 w.rowtime。窗口开始时间和行时间时间戳是包含的窗口下界和上界。相反,窗口结束时间戳是专属的上层窗口边界。例如一个从下午 2 点开始的 30 分钟的翻滚窗口,其起始时间戳为 14:00:00.000,行时时间戳为 14:29:59.999,结束时间戳为 14:30:00.000。
val table = input
.window([w: GroupWindow] as $"w") // define window with alias w
.groupBy($"w", $"a") // group the table by attribute a and window w
.select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count) // aggregate and add window start, end, and rowtime timestamps
窗口参数定义了如何将行映射到窗口。Window 不是一个用户可以实现的接口。相反,Table API 提供了一组具有特定语义的预定义 Window 类,它们被翻译成底层的 DataStream 或 DataSet 操作。下面列出了支持的窗口定义。
滚动窗口 #
滚动窗口将行分配到固定长度的非重叠的连续窗口。例如,5 分钟的滚动窗口以 5 分钟的间隔将行分组。滚动窗口可以在事件时间、处理时间或行数上定义。
滚动窗口可以通过使用 Tumble 类来定义,具体如下。
方法 | 描述 |
---|---|
over | 定义窗口的长度,可以是时间或行数间隔。 |
on | 要对其进行分组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流式查询,这必须是一个声明的事件时间或处理时间时间属性。 |
as | 为窗口指定一个别名。该别名用于在下面的 groupBy()子句中引用窗口,并在 select()子句中选择窗口属性,如窗口开始、结束或行时间戳。 |
// Tumbling Event-time Window
.window(Tumble over 10.minutes on $"rowtime" as $"w")
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on $"proctime" as $"w")
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on $"proctime" as $"w")
Slide (滑动窗口) #
滑动窗口有一个固定的尺寸,并按指定的滑动间隔滑动,如果滑动间隔小于窗口尺寸,滑动窗口就会重叠。如果滑动间隔小于窗口大小,滑动窗口就会重叠。因此,行可以分配给多个窗口。例如,一个 15 分钟大小和 5 分钟滑动间隔的滑动窗口将每行分配到 3 个不同的 15 分钟大小的窗口,这些窗口以 5 分钟的间隔进行评估。滑动窗口可以在事件时间、处理时间或行数上定义。
滑动窗口是通过使用 Slide 类来定义的,具体如下。
方法 | 描述 |
---|---|
over | 定义窗口的长度,可以是时间或行数间隔。 |
every | 定义滑动间隔,可以是时间间隔或行数间隔。缩放间隔的类型必须与尺寸间隔相同。 |
on | 要对其进行分组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流式查询,这必须是一个声明的事件时间或处理时间时间属性。 |
as | 为窗口指定一个别名。该别名用于在下面的 groupBy()子句中引用窗口,并在 select()子句中选择窗口属性,如窗口开始、结束或行时间戳。 |
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")
Session (会话窗口) #
会话窗口没有固定的大小,但它们的界限是由不活动的时间间隔来定义的,也就是说,如果在定义的间隙期没有事件出现,会话窗口就会被关闭。例如,有 30 分钟间隔的会话窗口在 30 分钟不活动后观察到一行时开始(否则该行将被添加到现有的窗口中),如果在 30 分钟内没有行被添加,则关闭。会话窗口可以在事件时间或处理时间工作。
通过使用 Session 类定义会话窗口,如下所示。
方法 | 描述 |
---|---|
withGap | 将两个窗口之间的间隔定义为时间间隔。 |
on | 要对其进行分组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流式查询,这必须是一个声明的事件时间或处理时间时间属性。 |
as | 为窗口指定一个别名。该别名用于在下面的 groupBy()子句中引用窗口,并在 select()子句中选择窗口属性,如窗口开始、结束或行时间戳。 |
// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on $"proctime" as $"w")
Over 窗口 #
Over 窗口聚合是从标准 SQL(over 子句)中得知的,并在查询的 SELECT 子句中定义。与组窗口不同的是,组窗口是在 GROUP BY 子句中指定的,over 窗口不折叠行。相反,over 窗口聚合计算的是每条输入行在其相邻行的范围内的聚合。
Over 窗口是使用 window(w: OverWindow*)子句来定义的(在 Python API 中使用 over_window(*OverWindow)),并且在 select()方法中通过别名来引用。下面的例子展示了如何在表上定义一个 over
窗口聚合。
val table = input
.window([w: OverWindow] as $"w") // define over window with alias w
.select($"a", $"b".sum over $"w", $"c".min over $"w") // aggregate over the over window w
OverWindow 定义了计算汇总的行的范围。OverWindow 不是一个用户可以实现的接口。相反,Table API 提供了 Over 类来配置 over 窗口的属性。Over 窗口可以在事件时间或处理时间上定义,也可以在指定为时间间隔或行数的范围上定义。支持的 over 窗口定义是以 Over(和其他类)上的方法暴露出来的,下面列出了这些方法。
方法 | Required | 描述 |
---|---|---|
partitionBy | Optional | 定义输入的一个或多个属性的分区。每个分区都被单独排序,聚合函数被分别应用到每个分区。注意:在流环境中,只有当窗口包含 partitionBy 子句时,才能并行计算 over window aggregates。如果没有 partitionBy(…),流就会被一个单一的、非并行的任务处理。 |
orderBy | Required | 定义每个分区中行的顺序,从而定义聚合函数应用到行的顺序。注意:对于流式查询,这必须是一个声明的事件时间或处理时间时间属性。目前,只支持单个排序属性。 |
preceding | Optional | 定义包含在窗口中并在当前行之前的行的间隔。这个间隔可以指定为时间间隔或行数间隔。有边界的窗口用间隔的大小来指定,例如,时间间隔为 10.分钟,行数间隔为 10.行。无边界的窗口用一个常数来指定,例如,时间间隔为 UNBOUNDED_RANGE,行数间隔为 UNBOUNDED_ROW。未绑定的窗口从分区的第一行开始。如果省略了前面的子句,UNBOUNDED_RANGE 和 CURRENT_RANGE 被用作窗口的默认前后。 |
following | Optional | 定义包含在窗口中并跟随当前行的行的窗口间隔。这个间隔必须与前一个间隔的单位(时间或行数)相同。目前,不支持在当前行之后添加行的窗口。您可以指定两个常量中的一个。CURRENT_ROW 将窗口的上界设置为当前行。CURRENT_RANGE 将窗口的上界设置为当前行的排序键,也就是说,所有与当前行具有相同排序键的行都包含在窗口中。如果省略下面的子句,时间间隔窗口的上界定义为 CURRENT_RANGE,行数间隔窗口的上界定义为 CURRENT_ROW。 |
as | Required | 为 over 窗口指定一个别名。该别名用于在下面的 select()子句中引用 over 窗口。 |
注意:目前,在同一个 select()调用中,所有的聚合函数都必须在同一个 over 窗口中计算。
Unbounded Over Windows #
// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as "w")
// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_RANGE as "w")
// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_ROW as "w")
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_ROW as "w")
Bounded Over Windows #
// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as "w")
// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding 1.minutes as "w")
// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 10.rows as "w")
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding 10.rows as "w")
基于行的操作 #
基于行的操作产生多列的输出。
- Map(Batch/Streaming)
使用用户定义的标量函数或内置的标量函数执行 map 操作。如果输出类型是复合类型,则输出将被扁平化。
class MyMapFunction extends ScalarFunction {
def eval(a: String): Row = {
Row.of(a, "pre-" + a)
}
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(Types.STRING, Types.STRING)
}
val func = new MyMapFunction()
val table = input
.map(func($"c")).as("a", "b")
- FlatMap(Batch/Streaming)
用表格函数执行 flatMap
操作。
class MyFlatMapFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
if (str.contains("#")) {
str.split("#").foreach({ s =>
val row = new Row(2)
row.setField(0, s)
row.setField(1, s.length)
collect(row)
})
}
}
override def getResultType: TypeInformation[Row] = {
Types.ROW(Types.STRING, Types.INT)
}
}
val func = new MyFlatMapFunction
val table = input
.flatMap(func($"c")).as("a", "b")
- Aggregate(Batch/Streaming/Result Updating)
用一个聚合函数执行一个聚合操作。必须用 select 语句关闭"聚合",select 语句不支持聚合函数。如果输出类型是复合类型,聚合的输出将被扁平化。
case class MyMinMaxAcc(var min: Int, var max: Int)
class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
if (value < acc.min) {
acc.min = value
}
if (value > acc.max) {
acc.max = value
}
}
override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0, 0)
def resetAccumulator(acc: MyMinMaxAcc): Unit = {
acc.min = 0
acc.max = 0
}
override def getValue(acc: MyMinMaxAcc): Row = {
Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))
}
override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(Types.INT, Types.INT)
}
}
val myAggFunc = new MyMinMax
val table = input
.groupBy($"key")
.aggregate(myAggFunc($"a") as ("x", "y"))
.select($"key", $"x", $"y")
- Group 窗口聚合(Batch/Streaming)
在分组窗口和可能的一个或多个分组键上对一个表进行分组和聚合。你必须用 select 语句关闭"聚合"。而且选择语句不支持 “*” 或聚合函数。
val myAggFunc = new MyMinMax
val table = input
.window(Tumble over 5.minutes on $"rowtime" as "w") // define window
.groupBy($"key", $"w") // group by key and window
.aggregate(myAggFunc($"a") as ("x", "y"))
.select($"key", $"x", $"y", $"w".start, $"w".end) // access window properties and aggregate results
- FlatAggregate(Streaming/Result Updating)
类似于 GroupBy 聚合。将分组键上的行与下面的运行表聚合运算符进行分组,将行进行分组。与 AggregateFunction 的不同之处在于,TableAggregateFunction 可以为一个组返回 0 条或多条记录。你必须用 select 语句关闭 “flatAggregate”。而 select 语句不支持聚合函数。
不使用 emitValue 输出结果,还可以使用 emitUpdateWithRetract 方法。与 emitValue 不同的是,emitUpdateWithRetract 用于输出已经更新的值。这个方法以回缩模式增量输出数据,也就是说,一旦有更新,我们必须在发送新的更新记录之前回缩旧的记录。如果在表聚合函数中定义了 emitUpdateWithRetract 方法,那么 emitUpdateWithRetract 方法将优先于 emitValue 方法使用,因为该方法被视为比 emitValue 更有效,因为它可以增量输出值。详见表聚合函数。
import java.lang.{Integer => JInteger}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableAggregateFunction
/**
* Accumulator for top2.
*/
class Top2Accum {
var first: JInteger = _
var second: JInteger = _
}
/**
* The top2 user-defined table aggregate function.
*/
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {
override def createAccumulator(): Top2Accum = {
val acc = new Top2Accum
acc.first = Int.MinValue
acc.second = Int.MinValue
acc
}
def accumulate(acc: Top2Accum, v: Int) {
if (v > acc.first) {
acc.second = acc.first
acc.first = v
} else if (v > acc.second) {
acc.second = v
}
}
def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
val iter = its.iterator()
while (iter.hasNext) {
val top2 = iter.next()
accumulate(acc, top2.first)
accumulate(acc, top2.second)
}
}
def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
// emit the value and rank
if (acc.first != Int.MinValue) {
out.collect(JTuple2.of(acc.first, 1))
}
if (acc.second != Int.MinValue) {
out.collect(JTuple2.of(acc.second, 2))
}
}
}
val top2 = new Top2
val orders: Table = tableEnv.from("Orders")
val result = orders
.groupBy($"key")
.flatAggregate(top2($"a") as ($"v", $"rank"))
.select($"key", $"v", $"rank")
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,这取决于聚合的类型和不同分组键的数量。请提供具有有效保留时间间隔的查询配置,以防止状态大小过大。详情请参见查询配置。
- Group Window FlatAggregate(Streaming)
在分组窗口和可能一个或多个分组键上对一个表进行分组和聚合。你必须用 select 语句关闭 “flatAggregate”。而 select 语句不支持聚合函数。
val top2 = new Top2
val orders: Table = tableEnv.from("Orders")
val result = orders
.window(Tumble over 5.minutes on $"rowtime" as "w") // define window
.groupBy($"a", $"w") // group by key and window
.flatAggregate(top2($"b") as ($"v", $"rank"))
.select($"a", w.start, $"w".end, $"w".rowtime, $"v", $"rank") // access window properties and aggregate results
数据类型 #
请看关于数据类型的专门页面。
通用类型和(嵌套的)复合类型(例如 POJOs、tuple、行、Scala case 类)也可以是行的字段。
具有任意嵌套的复合类型的字段可以用值访问函数来访问。
通用类型被视为一个黑盒,可以通过用户定义的函数进行传递或处理。
表达式语法 #
前面几节中的一些操作符都期望有一个或多个表达式。表达式可以使用内嵌的 Scala DSL 或作为字符串来指定。请参考上面的例子来了解如何指定表达式。
这是表达式的 EBNF 语法。
expressionList = expression , { "," , expression } ;
expression = overConstant | alias ;
alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;
term = product , [ ( "+" | "-" ) , product ] ;
product = unary , [ ( "*" | "/" | "%") , unary ] ;
unary = [ "!" | "-" | "+" ] , composite ;
composite = over | suffixed | nullLiteral | prefixed | atom ;
suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ;
prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;
interval = timeInterval | rowInterval ;
timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
rowInterval = composite , "." , "rows" ;
suffixCast = composite , ".cast(" , dataType , ")" ;
prefixCast = "cast(" , expression , dataType , ")" ;
dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
suffixAs = composite , ".as(" , fieldReference , ")" ;
prefixAs = "as(" , expression, fieldReference , ")" ;
suffixIf = composite , ".?(" , expression , "," , expression , ")" ;
prefixIf = "?(" , expression , "," , expression , "," , expression , ")" ;
suffixDistinct = composite , "distinct.()" ;
prefixDistinct = functionIdentifier , ".distinct" , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
suffixFunctionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
prefixFunctionCall = functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
atom = ( "(" , expression , ")" ) | literal | fieldReference ;
fieldReference = "*" | identifier ;
nullLiteral = "nullOf(" , dataType , ")" ;
timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
over = composite , "over" , fieldReference ;
overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;
timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
字符。在这里,literal 是一个有效的 Java 字元。字符串的字元可以使用单引号或双引号来指定。复制引号进行转义(例如 ‘It’s me.’ 或 “I ““like “dog.")。
空符。空符必须有一个类型。使用 nullOf(type)(例如 nullOf(INT))来创建一个空值。
字段引用。fieldReference 指定数据中的一列(如果使用 *
,则指定所有列),functionIdentifier 指定一个支持的标量函数。列名和函数名遵循 Java 标识符语法。
函数调用。作为字符串指定的表达式也可以使用前缀符号代替后缀符号来调用运算符和函数。
小数。如果需要处理精确的数值或大的小数,Table API 也支持 Java 的 BigDecimal 类型。在 Scala Table API 中,小数可以通过 BigDecimal(“123456”)来定义,而在 Java 中,可以通过附加一个 “p” 来表示精确,例如 123456p。
时间表示法。为了处理时间值,Table API 支持 Java SQL 的 Date, Time 和 Timestamp 类型。在 Scala Table API 中,可以通过使用 java.sql.Date.valueOf(“2016-06-27”)、java.sql.Time.valueOf(“10:10:42”) 或 java.sql.Timestamp.valueOf(“2016-06-27 10:10:42.123”) 来定义字符。Java 和 Scala Table API 还支持调用 “2016-06-27”.toDate()、“10:10:42”.toTime() 和 “2016-06-27 10:10:42.123”.toTimestamp() 来将 Strings 转换为时间类型。注意:由于 Java 的时态 SQL 类型依赖于时区,请确保 Flink 客户端和所有的 TaskManagers 使用相同的时区。
时间间隔。时间间隔可以用月数(Types.INTERVAL_MONTHS)或毫秒数(Types.INTERVAL_MILLIS)表示。同一类型的时间间隔可以加减(例如:1.小时+10.分钟)。可以将毫秒的时间间隔加到时间点上(如 “2016-08-10”.toDate + 5.days)。
Scala 表达式。Scala 表达式使用隐式转换。因此,确保在你的程序中添加通配符 import org.apache.flink.table.api._
。如果一个字词没有被当作表达式,可以使用.toExpr 如 3.toExpr 来强制转换一个字词。