rakulang, dartlang, nimlang, golang, rustlang, lang lang no see

Python API 指南

焉知非鱼

Python API Tutorial

Python API 指南 #

本演练将快速让你开始构建一个纯 Python Flink 项目。

关于如何设置 Python 执行环境,请参考 Python Table API 安装指南

设置一个 Python 项目 #

您可以先创建一个 Python 项目,然后按照安装指南安装 PyFlink 包。

Table API 应用程序通过声明一个表环境开始;对于批处理应用程序,可以是 BatchTableEvironment,对于流式应用程序,可以是 StreamTableEnvironment。这作为与 Flink 运行时交互的主要入口点。它可以用来设置执行参数,如重启策略、默认并行度等。表配置允许设置 Table API 的具体配置。

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

在创建的表环境中,可以声明 source/sink 表。

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

你也可以使用 TableEnvironment.sql_update() 方法来注册 DDL 中定义的 source/sink 表。

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

这将在执行环境中注册一个名为 mySource 的表和一个名为 mySink 的表。表 mySource 只有一列,即 word,它消耗从文件 /tmp/input 中读取的字符串。表 mySink 有两列,分别是 wordcount,将数据写入文件 /tmp/output,用 /t 作为字段分隔符。

现在,你可以创建一个作业(job),它从表 mySource 中读取输入,预先执行一些转换,并将结果写入表 mySink

t_env.from_path('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

最后你必须执行实际的 Flink Python Table API 作业。所有的操作,如创建源、转换和 sink 都是懒惰的。只有当 t_env.execute(job_name) 被调用时,作业才会被运行。

t_env.execute("tutorial_job")

到目前为止,完整的代码如下:

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource') \
    .group_by('word') \
    .select('word, count(1)') \
    .insert_into('mySink')

t_env.execute("tutorial_job")

首先,你需要在 “/tmp/input” 文件中准备输入数据。你可以选择以下命令行来准备输入数据。

$ echo -e  "flink\npyflink\nflink" > /tmp/input

接下来,你可以在命令行上运行这个例子(注意:如果结果文件 “/tmp/output” 已经存在,你需要在运行这个例子之前删除该文件)。

$ python WordCount.py

该命令在本地小型集群中构建并运行 Python Table API 程序。你也可以将 Python Table API 程序提交到远程集群,详情可以参考 Job Submission Examples

最后,您可以在命令行中看到执行结果。

$ cat /tmp/output
flink	2
pyflink	1

这应该可以让你开始编写自己的 Flink Python Table API 程序。要了解更多关于 Python Table API 的信息,你可以参考 Flink Python Table API Docs 了解更多细节。