临时表
— 焉知非鱼Temporal Tables
时间表
时间表代表了对变化表的(参数化)视图的概念,该视图返回一个表在特定时间点的内容。
变化表可以是跟踪变化的变化历史表(如数据库变化日志),也可以是将变化具体化的变化维度表(如数据库表)。
对于变化的历史表,Flink 可以跟踪变化,并允许在查询中的某个时间点访问表的内容。在 Flink 中,这种表用 Temporal Table Function 来表示。
对于变化的维度表,Flink 允许在查询内的处理时间点访问表的内容。在 Flink 中,这种表是由一个 Temporal Table 来表示的。
动机 #
与不断变化的历史表相关联 #
假设我们有以下表格 RatesHistory。
SELECT * FROM RatesHistory;
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
RatesHistory 代表了一个不断增长的对日元(汇率为 1)的货币汇率附加表。例如,从 09:00 到 10:45,欧元对日元的汇率是 114,从 10:45 到 11:15 是 116。从 10:45 到 11:15 是 116。
考虑到我们希望输出 10:58 时的所有当前汇率,我们将需要以下 SQL 查询来计算结果表。
SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (
SELECT MAX(rowtime)
FROM RatesHistory AS r2
WHERE r2.currency = r.currency
AND r2.rowtime <= TIME '10:58');
相关子查询确定相应货币的最大时间低于或等于期望时间。外层查询列出具有最大时间戳的汇率。
下表显示了这种计算的结果。在我们的例子中,10:45 的欧元更新被考虑在内,然而,11:15 的欧元更新和新输入的英镑在 10:58 的表格中没有被考虑。
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Yen 1
10:45 Euro 116
Temporal Tables 的概念旨在简化此类查询,加快其执行速度,并减少 Flink 的状态使用。Temporal Table 是一个关于 append-only 表的参数化视图,它将 append-only 表的行解释为表的 changelog,并提供该表在特定时间点的版本。将 append-only 表解释为变更日志需要指定一个主键属性和一个时间戳属性。主键决定哪些行会被覆盖,时间戳决定行的有效时间。
在上面的例子中,currency 是 RatesHistory 表的主键,rowtime 是时间戳属性。
在 Flink 中,这是由一个 Temporal Table Function 来表示的。
与变化的维度表相关联 #
另一方面,有些用例需要加入一个不断变化的维度表,而这个表是一个外部数据库表。
让我们假设 LatestRates 是一张表(例如存储在),它是以最新的速率来物化的。LatestRates 是物化的历史 RatesHistory。那么 LatestRates 表在时间 10:58 时的内容将是。
10:58> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Yen 1
Euro 116
12:00 时 LatestRates 表的内容将是:
12:00> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Yen 1
Euro 119
Pounds 108
在 Flink 中,这用一个时间表来表示。
时间表函数 #
为了访问时态表中的数据,必须传递一个时间属性,该属性决定了将返回的表的版本。Flink 使用表函数的 SQL 语法来提供一种表达方式。
一旦定义好,一个时态表函数就会接受一个单一的时间参数 timeAttribute,并返回一组行。这个集合包含了所有现有主键相对于给定时间属性的最新版本的行。
假设我们基于 RatesHistory 表定义了一个时态表函数 Rates(timeAttribute)
,我们可以用下面的方式查询这样的函数。
SELECT * FROM Rates('10:15');
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
SELECT * FROM Rates('11:00');
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
10:45 Euro 116
09:00 Yen 1
每次查询 Rates(timeAttribute)
都会返回给定时间属性的 Rates 的状态。
注意:目前,Flink 不支持直接查询带有恒定时间属性参数的时态表函数。目前,时态表函数只能用于连接。上面的例子是用来提供对函数 Rates(timeAttribute)
返回内容的直观认识。
关于如何使用时态表进行联接的更多信息,还请参见关于连续查询的连接页面。
定义时态表函数 #
下面的代码片段说明了如何从一个仅有追加的表创建一个时态表函数。
// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// Provide a static data set of the rates history table.
val ratesHistoryData = new mutable.MutableList[(String, Long)]
ratesHistoryData.+=(("US Dollar", 102L))
ratesHistoryData.+=(("Euro", 114L))
ratesHistoryData.+=(("Yen", 1L))
ratesHistoryData.+=(("Euro", 116L))
ratesHistoryData.+=(("Euro", 119L))
// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
val ratesHistory = env
.fromCollection(ratesHistoryData)
.toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
tEnv.createTemporaryView("RatesHistory", ratesHistory)
// Create and register TemporalTableFunction.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
val rates = ratesHistory.createTemporalTableFunction($"r_proctime", $"r_currency") // <==== (1)
tEnv.registerFunction("Rates", rates) // <==== (2)
第(1)行创建了一个 Rates 时态表函数,这使得我们可以使用 Table API 中的函数 Rates。
第(2)行在我们的表环境中以 Rates 的名义注册这个函数,这使得我们可以在 SQL 中使用 Rates 函数。
时态表 #
注意: 这只在 Blink planner 中支持。
为了访问时间表中的数据,目前必须定义一个 LookupableTableSource 的 TableSource。Flink 使用 SQL:2011 中提出的 SQL 语法 FOR SYSTEM_TIME AS OF 来查询时间表。
假设我们定义了一个名为 LatestRates 的时态表,我们可以用下面的方式查询这样的表。
SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
currency rate
======== ======
US Dollar 102
Euro 116
Yen 1
注意:目前,Flink 不支持直接查询时间恒定的时态表。目前,时态表只能用在 join 中。上面的例子是用来提供一个直观的时间表 LatestRates 返回的内容。
更多关于如何使用时态表进行连接的信息,请参见关于连续查询的连接页面。
定义时态表 #
// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// or val tEnv = TableEnvironment.create(settings)
// Define an HBase table with DDL, then we can use it as a temporal table in sql
// Column 'currency' is the rowKey in HBase table
tEnv.executeSql(
s"""
|CREATE TABLE LatestRates (
| currency STRING,
| fam1 ROW<rate DOUBLE>
|) WITH (
| 'connector' = 'hbase-1.4',
| 'table-name' = 'Rates',
| 'zookeeper.quorum' = 'localhost:2181'
|)
|""".stripMargin)
也请参见如何定义 LookupableTableSource 的页面。