Python API 指南
— 焉知非鱼Python API Tutorial
Python API 指南 #
本演练将快速让你开始构建一个纯 Python Flink 项目。
关于如何设置 Python 执行环境,请参考 Python Table API 安装指南。
设置一个 Python 项目 #
您可以先创建一个 Python 项目,然后按照安装指南安装 PyFlink 包。
编写一个 Flink Python Table API 程序 #
Table API 应用程序通过声明一个表环境开始;对于批处理应用程序,可以是 BatchTableEvironment,对于流式应用程序,可以是 StreamTableEnvironment。这作为与 Flink 运行时交互的主要入口点。它可以用来设置执行参数,如重启策略、默认并行度等。表配置允许设置 Table API 的具体配置。
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
在创建的表环境中,可以声明 source/sink 表。
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
你也可以使用 TableEnvironment.sql_update()
方法来注册 DDL 中定义的 source/sink 表。
my_source_ddl = """
create table mySource (
word VARCHAR
) with (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/tmp/input'
)
"""
my_sink_ddl = """
create table mySink (
word VARCHAR,
`count` BIGINT
) with (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/tmp/output'
)
"""
t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)
这将在执行环境中注册一个名为 mySource 的表和一个名为 mySink 的表。表 mySource 只有一列,即 word,它消耗从文件 /tmp/input
中读取的字符串。表 mySink 有两列,分别是 word 和 count,将数据写入文件 /tmp/output
,用 /t
作为字段分隔符。
现在,你可以创建一个作业(job),它从表 mySource 中读取输入,预先执行一些转换,并将结果写入表 mySink。
t_env.from_path('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
最后你必须执行实际的 Flink Python Table API 作业。所有的操作,如创建源、转换和 sink 都是懒惰的。只有当 t_env.execute(job_name)
被调用时,作业才会被运行。
t_env.execute("tutorial_job")
到目前为止,完整的代码如下:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("tutorial_job")
执行 Flink Python Table API 程序 #
首先,你需要在 “/tmp/input” 文件中准备输入数据。你可以选择以下命令行来准备输入数据。
$ echo -e "flink\npyflink\nflink" > /tmp/input
接下来,你可以在命令行上运行这个例子(注意:如果结果文件 “/tmp/output” 已经存在,你需要在运行这个例子之前删除该文件)。
$ python WordCount.py
该命令在本地小型集群中构建并运行 Python Table API 程序。你也可以将 Python Table API 程序提交到远程集群,详情可以参考 Job Submission Examples。
最后,您可以在命令行中看到执行结果。
$ cat /tmp/output
flink 2
pyflink 1
这应该可以让你开始编写自己的 Flink Python Table API 程序。要了解更多关于 Python Table API 的信息,你可以参考 Flink Python Table API Docs 了解更多细节。