Hive Read and Write
— 焉知非鱼Hive Read and Write
Hive 读写
使用 HiveCatalog 和 Flink 与 Hive 的连接器,Flink 可以从 Hive 数据中读取和写入数据,作为 Hive 批处理引擎的替代。请务必按照说明在你的应用中加入正确的依赖关系。同时请注意,Hive 连接器只适用于 blink planner。
从 Hive 读取数据 #
假设 Hive 在其默认的数据库中包含一个名为 people 的单表,该表包含多条记录。
hive> show databases;
OK
default
Time taken: 0.841 seconds, Fetched: 1 row(s)
hive> show tables;
OK
Time taken: 0.087 seconds
hive> CREATE TABLE mytable(name string, value double);
OK
Time taken: 0.127 seconds
hive> SELECT * FROM mytable;
OK
Tom 4.72
John 8.0
Tom 24.2
Bob 3.14
Bob 4.72
Tom 34.9
Mary 4.79
Tiff 2.72
Bill 4.33
Mary 77.7
Time taken: 0.097 seconds, Fetched: 10 row(s)
数据准备好后,你可以连接到现有的 Hive 安装并开始查询。
Flink SQL> show catalogs;
myhive
default_catalog
# ------ Set the current catalog to be 'myhive' catalog if you haven't set it in the yaml file ------
Flink SQL> use catalog myhive;
# ------ See all registered database in catalog 'mytable' ------
Flink SQL> show databases;
default
# ------ See the previously registered table 'mytable' ------
Flink SQL> show tables;
mytable
# ------ The table schema that Flink sees is the same that we created in Hive, two columns - name as string and value as double ------
Flink SQL> describe mytable;
root
|-- name: name
|-- type: STRING
|-- name: value
|-- type: DOUBLE
# ------ Select from hive table or hive view ------
Flink SQL> SELECT * FROM mytable;
name value
__________ __________
Tom 4.72
John 8.0
Tom 24.2
Bob 3.14
Bob 4.72
Tom 34.9
Mary 4.79
Tiff 2.72
Bill 4.33
Mary 77.7
查询 Hive 视图 #
如果你需要查询 Hive 视图,请注意。
在查询该目录中的视图之前,必须先使用 Hive 目录作为当前目录。可以通过 Table API 中的 tableEnv.useCatalog(...)
或者 SQL Client 中的 USE CATALOG …来实现。
Hive 和 Flink SQL 有不同的语法,例如,不同的保留关键字和字元。请确保视图的查询与 Flink 语法兼容。
写入 Hive #
同样,也可以使用 INSERT 子句将数据写入 hive 中。
考虑有一个名为 “mytable “的示例表,表中有两列:name 和 age,类型为 string 和 int。
# ------ INSERT INTO will append to the table or partition, keeping the existing data intact ------
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
# ------ INSERT OVERWRITE will overwrite any existing data in the table or partition ------
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
我们也支持分区表,考虑有一个名为 myparttable 的分区表,有四列:name、age、my_type 和 my_date,在 type 中…my_type
和 my_date
是分区键。
# ------ Insert with static partition ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
# ------ Insert with dynamic partition ------
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
# ------ Insert with static(my_type) and dynamic(my_date) partition ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
格式 #
我们测试了以下表格存储格式:文本、csv、SequenceFile、ORC 和 Parquet。
优化 #
分区修剪 #
Flink 使用分区修剪作为一种性能优化,以限制 Flink 在查询 Hive 表时读取的文件和分区的数量。当你的数据被分区后,当查询符合某些过滤条件时,Flink 只会读取 Hive 表中的分区子集。
投影下推 #
Flink 利用投影下推,通过从表扫描中省略不必要的字段,最大限度地减少 Flink 和 Hive 表之间的数据传输。
当一个表包含许多列时,它尤其有利。
限制下推 #
对于带有 LIMIT 子句的查询,Flink 会尽可能地限制输出记录的数量,以减少跨网络传输的数据量。
读取时的向量优化 #
当满足以下条件时,会自动使用优化功能。
- 格式: ORC 或 Parquet。
- 没有复杂数据类型的列,如 hive 类型: List, Map, Struct, Union。
这个功能默认是开启的。如果出现问题,可以使用这个配置选项来关闭 Vectorized Optimization。
table.exec.hive.fallback-mapred-reader=true
Source 并行性推断 #
默认情况下,Flink 根据分割次数来推断 hive 源的并行度,分割次数是根据文件的数量和文件中的块数来推断的。
Flink 允许你灵活配置并行度推断的策略。你可以在 TableConfig 中配置以下参数(注意,这些参数会影响作业的所有源)。
Key | Default | Type | Description |
---|---|---|---|
table.exec.hive.infer-source-parallelism | true | Boolean | 如果为真,则根据分割数来推断源的并行度,如果为假,则根据配置来设置源的并行度。如果为 false,则通过配置来设置源的并行度。 |
table.exec.hive.infer-source-parallelism.max | 1000 | Integer | 设置源运算符的最大推断并行度。 |
路线图 #
我们正在规划并积极开发支持功能,如:
- ACID 表
- 分桶表
- 更多格式
更多功能需求请联系社区 https://flink.apache.org/community.html#mailing-lists
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html