HiveCatalog
— 焉知非鱼HiveCatalog
HiveCatalog
Hive Metastore 经过多年的发展,已经成为 Hadoop 生态系统中事实上的元数据中心。很多公司在生产中都有一个 Hive Metastore 服务实例来管理他们所有的元数据,无论是 Hive 元数据还是非 Hive 元数据,都是真理的来源。
对于同时部署了 Hive 和 Flink 的用户,HiveCatalog 可以让他们使用 Hive Metastore 来管理 Flink 的元数据。
对于只有 Flink 部署的用户来说,HiveCatalog 是 Flink 开箱即用的唯一持久化目录。如果没有持久化目录,用户使用 Flink SQL CREATE DDL 必须在每个会话中反复创建元对象,比如 Kafka 表,这就浪费了很多时间。HiveCatalog 填补了这一空白,使用户只需创建一次表和其他元对象,以后就可以跨会话方便地引用和管理它们。
设置 HiveCatalog #
依赖性 #
在 Flink 中设置 HiveCatalog 需要与 Flink-Hive 集成相同的依赖关系。
配置 #
在 Flink 中设置 HiveCatalog 需要与 Flink-Hive 集成相同的配置。
如何使用 HiveCatalog #
一旦配置得当,HiveCatalog 应该可以直接使用。用户可以用 DDL 创建 Flink 元对象,并在创建后立即看到它们。
HiveCatalog 可以用来处理两种表。Hive 兼容表和通用表。Hive-compatible 表是指那些以 Hive 兼容的方式存储的表,在存储层的元数据和数据方面都是如此。因此,通过 Flink 创建的 Hive 兼容表可以从 Hive 端进行查询。
而通用表则是针对 Flink 的。当使用 HiveCatalog 创建通用表时,我们只是使用 HMS 来持久化元数据。虽然这些表对 Hive 是可见的,但 Hive 不太可能理解这些元数据。因此在 Hive 中使用这样的表会导致未定义的行为。
Flink 使用属性 “is_generic” 来判断一个表是与 Hive 兼容还是通用。当用 HiveCatalog 创建一个表时,它默认被认为是通用的。如果你想创建一个 Hive 兼容的表,请确保在你的表属性中把 is_generic 设置为 false。
如上所述,通用表不应该从 Hive 使用。在 Hive CLI 中,你可以调用 describe FORMATTED 对一个表进行检查,通过检查 is_generic 属性来决定它是否是通用的。通用表会有 is_generic=true。
例子 #
我们在这里将通过一个简单的例子进行讲解。
第 1 步:设置 Hive Metastore。
有一个 Hive Metastore 在运行。
在这里,我们在本地路径 /opt/hive-conf/hive-site.xml
中设置一个本地 Hive Metastore 和我们的 hive-site.xml 文件。我们有如下的一些配置。
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true</value>
<description>metadata is stored in a MySQL server</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>MySQL JDBC driver class</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>...</value>
<description>user name for connecting to mysql server</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>...</value>
<description>password for connecting to mysql server</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>IP address (or fully-qualified domain name) and port of the metastore host</description>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>true</value>
</property>
</configuration>
用 Hive Cli 测试连接到 HMS。运行一些命令,我们可以看到我们有一个名为 default 的数据库,里面没有表。
hive> show databases;
OK
default
Time taken: 0.032 seconds, Fetched: 1 row(s)
hive> show tables;
OK
Time taken: 0.028 seconds, Fetched: 0 row(s)
步骤 2:配置 Flink 集群和 SQL CLI
将所有 Hive 的依赖关系添加到 Flink 发行版的 /lib
目录下,并修改 SQL CLI 的 yaml 配置文件 sql-cli-defaults.yaml 如下。
execution:
planner: blink
type: streaming
...
current-catalog: myhive # set the HiveCatalog as the current catalog of the session
current-database: mydatabase
catalogs:
- name: myhive
type: hive
hive-conf-dir: /opt/hive-conf # contains hive-site.xml
第三步:建立 Kafka 集群
Bootstrap 一个本地的 Kafka 2.3.0 集群,主题命名为 “test”,并以 name 和 age 的元组形式向主题产生一些简单的数据。
localhost$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>tom,15
>john,21
这些消息可以通过启动 Kafka 控制台消费者看到。
localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
tom,15
john,21
第四步:启动 SQL Client,用 Flink SQL DDL 创建一个 Kafka 表。
启动 Flink SQL Client,通过 DDL 创建一个简单的 Kafka 2.3.0 表,并验证其模式。
Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'update-mode' = 'append'
);
[INFO] Table has been created.
Flink SQL> DESCRIBE mykafka;
root
|-- name: STRING
|-- age: INT
验证该表也是通过 Hive Cli 对 Hive 可见的,注意该表有属性 is_generic=true。
hive> show tables;
OK
mykafka
Time taken: 0.038 seconds, Fetched: 1 row(s)
hive> describe formatted mykafka;
OK
# col_name data_type comment
# Detailed Table Information
Database: default
Owner: null
CreateTime: ......
LastAccessTime: UNKNOWN
Retention: 0
Location: ......
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector.properties.bootstrap.servers localhost:9092
flink.connector.topic test
flink.connector.type kafka
flink.connector.version universal
flink.format.type csv
flink.generic.table.schema.0.data-type VARCHAR(2147483647)
flink.generic.table.schema.0.name name
flink.generic.table.schema.1.data-type INT
flink.generic.table.schema.1.name age
flink.update-mode append
is_generic true
transient_lastDdlTime ......
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.158 seconds, Fetched: 36 row(s)
第五步:运行 Flink SQL 查询 Kakfa 表。
在 Flink 集群中,无论是单机还是 yarn-session,从 Flink SQL Client 中运行一个简单的选择查询。
Flink SQL> select * from mykafka;
在 Kafka 主题中多产生一些信息。
localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
tom,15
john,21
kitty,30
amy,24
kaiky,18
你现在应该可以在 SQL Client 中看到 Flink 产生的结果,如。
SQL Query Result (Table)
Refresh: 1 s Page: Last of 1
name age
tom 15
john 21
kitty 30
amy 24
kaiky 18
支持的类型 #
HiveCatalog 支持通用表的所有 Flink 类型。
对于 Hive 兼容的表,HiveCatalog 需要将 Flink 数据类型映射到相应的 Hive 类型,如下表所述。
Flink Data Type | Hive Data Type |
---|---|
CHAR(p) | CHAR(p) |
VARCHAR(p) | VARCHAR(p) |
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
DATE | DATE |
TIMESTAMP(9) | TIMESTAMP |
BYTES | BINARY |
ARRAY | LIST |
MAP<K, V> | MAP<K, V> |
ROW | STRUCT |
关于类型映射需要注意的地方。
- Hive 的 CHAR(p) 最大长度为 255。
- Hive 的 VARCHAR(p) 最大长度为 65535。
- Hive 的 MAP 只支持基元键类型,而 Flink 的 MAP 可以是任何数据类型。
- 不支持 Hive 的 UNION 类型。
- Hive 的 TIMESTAMP 的精度总是 9,不支持其他精度。而 Hive 的 UDF 则可以处理精度<=9 的 TIMESTAMP 值。
- Hive 不支持 Flink 的 TIMESTAMP_WITH_TIME_ZONE、TIMESTAMP_WITH_LOCAL_TIME_ZONE 和 MULTISET。
- Flink 的 INTERVAL 类型还不能映射到 Hive 的 INTERVAL 类型。
Scala Shell #
注意:由于目前 Scala Shell 并不支持 blink planner,所以不建议在 Scala Shell 中使用 Hive 连接器。
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html