Create 语句
— 焉知非鱼Create Statements
CREATE 语句
CREATE 语句用于在当前或指定的目录中注册一个表/视图/函数。注册的表/视图/函数可以在 SQL 查询中使用。
Flink SQL 目前支持以下 CREATE 语句。
- CREATE TABLE
- CREATE DATABASE
- CREATE VIEW
- CREATE FUNCTION
运行一条 CREATE 语句 #
CREATE 语句可以用 TableEnvironment 的 executeSql()方法执行,也可以在 SQL CLI 中执行。executeSql()方法对于一个成功的 CREATE 操作会返回’OK',否则会抛出一个异常。
下面的例子展示了如何在 TableEnvironment 和 SQL CLI 中运行 CREATE 语句。
// Scala
val settings = EnvironmentSettings.newInstance()...
val tableEnv = TableEnvironment.create(settings)
// SQL query with a registered table
// register a table named "Orders"
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// Execute insert SQL with a registered table
// register a TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
// run an insert SQL on the Table and emit the result to the TableSink
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...
CREATE TABLE #
CREATE TABLE [catalog_name.][db_name.]table_name
(
{ <column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
<column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
用给定的名称创建一个表。如果目录中已经存在同名表,则抛出一个异常。
计算列
计算列是使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它是由一个非查询表达式生成的,这个表达式使用同一张表中的其他列,而不是实际存储在表中。例如,计算列可以定义为 cost AS price * quantity
。表达式可以包含物理列、常量、函数或变量的任意组合。表达式不能包含子查询。
计算列在 Flink 中通常用于在 CREATE TABLE 语句中定义时间属性。可以通过 proc AS PROCTIME()
使用系统 proctime()
函数轻松定义一个处理时间属性。另一方面,计算列可以用来派生事件时间列,因为事件时间列可能需要从现有的字段中派生出来,比如原来的字段不是 TIMESTAMP(3)类型,或者嵌套在 JSON 字符串中。
注意:
- 在源表上定义的计算列是在从源表读取后计算出来的,它可以用在下面的 SELECT 查询语句中。
- 计算列不能作为 INSERT 语句的目标。在 INSERT 语句中,SELECT 子句的模式应该与没有计算列的目标表的模式相匹配。
WATERMARK
WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
。
rowtime_column_name
定义了一个现有的列,该列被标记为表的事件时间属性。这个列的类型必须是 TIMESTAMP(3),并且是模式中的顶层列。它可以是一个计算列。
watermark_strategy_expression
定义了水印生成策略。它允许任意的非查询表达式,包括计算列,来计算水印。表达式的返回类型必须是 TIMESTAMP(3),它表示自 Epoch 以来的时间戳。只有当返回的水印是非空的,并且它的值大于之前发出的本地水印时,才会发出水印(以保留升水印的契约)。水印生成表达式由框架对每条记录进行评估。框架将定期发射最大的生成水印。如果当前的水印仍然与上一个水印相同,或者是空的,或者返回的水印值小于上一次发射的水印值,那么将不会发射新的水印。水印是在 pipeline.auto-watermark-interval
配置定义的时间间隔内发出的。如果水印间隔为 0ms,如果生成的水印不是空的,并且大于最后一个水印,则每条记录都会发出水印。
当使用事件时间语义时,表必须包含事件时间属性和水印策略。
Flink 提供了几种常用的水印策略。
-
严格的升序时间戳。WATERMARK FOR rowtime_column AS rowtime_column。
-
发出迄今为止观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。
-
升序时间戳。WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND.
-
发出迄今为止观察到的最大时间戳的水印减 1。时间戳等于或小于最大时间戳的行不会迟到。
-
绑定出顺序性的时间戳。WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit.
发出水印,水印是最大观察到的时间戳减去指定的延迟,例如:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是 5 秒的延迟水印策略。
CREATE TABLE Orders (
user BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
PRIMARY KEY
PRIMARY KEY
Flink 利用优化的一个提示。它告诉我们一个表或视图的一列或一组列是唯一的,它们不包含空值。主键中的两列都不能为空。因此,主键可以唯一地识别表中的某一行。
主键约束既可以和列定义一起声明(列约束),也可以作为单行(表约束)。对于这两种情况,只能将其声明为一个单子。如果你同时定义了多个主键约束,就会抛出一个异常。
有效性检查
SQL 标准规定,一个约束可以是 ENFORCED 或 NOT ENFORCED。这控制了约束检查是否会在输入/输出数据上执行。Flink 并不拥有数据,因此我们要支持的唯一模式是 NOT ENFORCED 模式。用户要确保查询强制执行密钥的完整性。
Flink 会假设主键的正确性,假设列的空性与主键的列对齐。连接器应该确保这些是对齐的。
注意事项: 在 CREATE TABLE 语句中,创建主键约束会改变列的可空性,也就是说,有主键约束的列是不可空的。
PARTITIONED BY
按指定的列对创建的表进行分区。如果该表被用作文件系统汇,则会为每个分区创建一个目录。
WITH OPTIONS
表属性用于创建表源/接收器。这些属性通常用于查找和创建底层连接器。
表达式 key1=val1 的键和值都应该是字符串文字。关于不同连接器的所有支持的表属性,请参见连接到外部系统中的详细信息。
注释:表名可以有三种格式。表名可以有三种格式。
- catalog_name.db_name.table_name
- db_name.table_name
- table_name。
对于 catalog_name.db_name.table_name,表将被注册到元存储中,目录名为 “catalog_name”,数据库名为 “db_name”;对于 db_name.table_name,表将被注册到执行表环境的当前目录中,数据库名为 “db_name”;对于 table_name,表将被注册到执行表环境的当前目录和数据库中。
注意事项: 用 CREATE TABLE 语句注册的表既可以作为表源,也可以作为表汇,在 DMLs 中没有引用之前,我们不能决定它是作为表源还是表汇使用。
LIKE 子句
LIKE 子句是 SQL 特征的变体/组合(特征 T171,“表定义中的 LIKE 子句"和特征 T173,“表定义中的扩展 LIKE 子句”)。该子句可用于根据现有表的定义创建一个表。此外,用户还可以扩展原表或排除其中的某些部分。与 SQL 标准不同的是,该子句必须在 CREATE 语句的顶层定义。这是因为该子句适用于定义的多个部分,而不仅仅是模式部分。
你可以使用该子句来重用(并可能覆盖)某些连接器属性,或者为外部定义的表添加水印。例如,您可以为 Apache Hive 中定义的表添加水印。
请考虑下面的示例语句。
CREATE TABLE Orders (
user BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
由此产生的 Orders_with_watermark 表将等同于用以下语句创建的表。
CREATE TABLE Orders_with_watermark (
user BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
可以用同类选项控制表功能的合并逻辑。
您可以控制以下的合并行为:
- CONSTRAINTS - 主键和唯一键等约束条件。
- GENERATED-计算列
- OPTIONS - 描述连接器和格式属性的连接器选项。
- PARTITIONS - 表的分区
- WATERMARKS - 水印声明
有三种不同的合并策略。
- INCLUDING - 包括源表的特征,对重复的条目失败,例如,如果两个表中都存在相同键的选项。
- EXCLUDING - 不包含源表的给定特征。
- OVERWRITING - 包括源表的特征,用新表的属性覆盖源表的重复条目,例如,如果两个表中都存在具有相同键的选项,则将使用当前语句中的选项。
此外,可以使用 INCLUDING/EXCLUDING ALL 选项来指定如果没有定义特定的策略应该是什么,即如果使用 EXCLUDING ALL INCLUDING WATERMARKS,则只从源表中包含水印。
例子:
-- A source table stored in a filesystem
CREATE TABLE Orders_in_file (
user BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
)
PARTITIONED BY user
WITH (
'connector' = 'filesystem'
'path' = '...'
);
-- A corresponding table we want to store in kafka
CREATE TABLE Orders_in_kafka (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector': 'kafka'
...
)
LIKE Orders_in_file (
-- Exclude everything besides the computed columns which we need to generate the watermark for.
-- We do not want to have the partitions or filesystem options as those do not apply to kafka.
EXCLUDING ALL
INCLUDING GENERATED
);
如果您没有提供同类选项,则默认使用 INCLUDING ALL OVERWRITING OPTIONS
。
注意: 您无法控制合并物理字段的行为。这些字段将被合并,就像您应用 INCLUDING 策略一样。
CREATE CATALOG #
CREATE CATALOG catalog_name
WITH (key1=val1, key2=val2, ...)
用给定的目录属性创建一个目录。如果已经存在同名的目录,则会产生异常。
WITH OPTIONS
目录属性,用于存储与本目录相关的额外信息。表达式 key1=val1 的键和值都应该是字符串文字。
更多详情请查看目录。
CREATE DATABASE #
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
用给定的数据库属性创建一个数据库,如果目录中已经存在同名的数据库,则抛出异常。如果目录中已经存在相同名称的数据库,则会抛出异常。
IF NOT EXISTS
如果数据库已经存在,则不会发生任何事情。
WITH OPTIONS
数据库属性,用于存储与本数据库相关的额外信息。表达式 key1=val1 的键和值都应该是字符串文字。
CREATE VIEW #
CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[{columnName [, columnName ]* }] [COMMENT view_comment]
AS query_expression
用给定的查询表达式创建一个视图,如果目录中已经存在同名的视图,则抛出异常。如果目录中已经存在同名的视图,则会抛出异常。
TEMPORARY
创建具有目录和数据库命名空间并覆盖视图的临时视图。
IF NOT EXISTS
如果视图已经存在,则不会发生任何事情。
CREATE FUNCTION #
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
创建一个目录函数,该函数具有目录和数据库的名称空间,并带有标识符和可选的语言标签。如果目录中已经存在同名函数,则会抛出一个异常。
如果语言标签是 JAVA/SCALA,标识符是 UDF 的完整 classpath。关于 Java/Scala UDF 的实现,请参考 User-defined Functions 了解详情。
如果语言标签是 PYTHON,标识符是 UDF 的完全限定名,例如 pyflink.table.test.test_udf.add。关于 Python UDF 的实现,更多细节请参考 Python UDFs。
TEMPORARY
创建具有目录和数据库命名空间并覆盖目录功能的临时目录功能。
TEMPORARY SYSTEM
创建没有命名空间并覆盖内置函数的临时系统函数。
IF NOT EXISTS
如果函数已经存在,则不会发生任何事情。
LANGUAGE JAVA|SCALA|PYTHON
语言标签,用于指示 Flink 运行时如何执行函数。目前只支持 JAVA、SCALA 和 PYTHON,函数的默认语言是 JAVA。
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html