Insert 语句
— 焉知非鱼Insert Statements
INSERT 语句
INSERT 语句用于向表中添加行。
运行 INSERT 语句 #
单条 INSERT 语句可以通过 TableEnvironment 的 executeSql()
方法执行,也可以在 SQL CLI 中执行。INSERT 语句的 executeSql()
方法会立即提交一个 Flink 作业,并返回一个与提交的作业相关联的 TableResult 实例。多个 INSERT 语句可以通过 StatementSet 的 addInsertSql()
方法执行,StatementSet 可以由 TableEnvironment.createStatementSet()
方法创建。addInsertSql()
方法是一种懒惰的执行方式,它们只有在调用 StatementSet.execute()
时才会被执行。
下面的例子展示了如何在 TableEnvironment 中运行一条 INSERT 语句,以及在 SQL CLI 中,在 StatementSet 中运行多条 INSERT 语句。
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...);
Flink SQL> SHOW TABLES;
Orders
RubberOrders
Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
val settings = EnvironmentSettings.newInstance()...
val tEnv = TableEnvironment.create(settings)
// register a source table named "Orders" and a sink table named "RubberOrders"
tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
// run a single INSERT query on the registered source table and emit the result to registered sink table
val tableResult1 = tEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
// get job status through TableResult
println(tableResult1.getJobClient().get().getJobStatus())
//----------------------------------------------------------------------------
// register another sink table named "GlassOrders" for multiple INSERT queries
tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
// run multiple INSERT queries on the registered source table and emit the result to registered sink tables
val stmtSet = tEnv.createStatementSet()
// only single INSERT query can be accepted by `addInsertSql` method
stmtSet.addInsertSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
stmtSet.addInsertSql(
"INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
// execute all statements together
val tableResult2 = stmtSet.execute()
// get job status through TableResult
println(tableResult2.getJobClient().get().getJobStatus())
Insert from select queries #
查询结果可以通过使用插入子句插入到表中。
语法 #
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
OVERWRITE
INSERT OVERWRITE 将覆盖表或分区中的任何现有数据。否则,将追加新数据。
PARTITION
PARTITION 子句应包含本次插入的静态分区列。
例子 #
-- Creates a partitioned table
CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
PARTITIONED BY (date, country)
WITH (...)
-- Appends rows into the static partition (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
SELECT user, cnt FROM page_view_source;
-- Appends rows into partition (date, country), where date is static partition with value '2019-8-30',
-- country is dynamic partition whose value is dynamic determined by each row.
INSERT INTO country_page_view PARTITION (date='2019-8-30')
SELECT user, cnt, country FROM page_view_source;
-- Overwrites rows into static partition (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
SELECT user, cnt FROM page_view_source;
-- Overwrites rows into partition (date, country), where date is static partition with value '2019-8-30',
-- country is dynamic partition whose value is dynamic determined by each row.
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
SELECT user, cnt, country FROM page_view_source;
Insert values into tables #
INSERT…VALUES 语句可以用来直接从 SQL 中向表中插入数据。
语法 #
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
values_row:
: (val1 [, val2, ...])
OVERWRITE
INSERT OVERWRITE 将覆盖表中的任何现有数据。否则,将追加新数据。
例子 #
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
INSERT INTO students
VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html