Python 演练
— 焉知非鱼Python Walkthrough
Python 演练 #
Stateful Functions 为构建健壮的、有状态的事件驱动的应用程序提供了一个平台。它提供了对状态和时间的精细控制,这使得高级系统的实现成为可能。在本步骤指南中,您将学习如何使用 Stateful Functions API 构建有状态的应用程序。
你要构建什么? #
就像软件中所有伟大的介绍一样,这个演练将从开头开始:打招呼。该应用程序将运行一个简单的函数,该函数将接受一个请求并以问候语进行响应。它不会试图涵盖所有复杂的应用程序开发,而是专注于构建一个有状态的函数 - 这是你实现业务逻辑的地方。
先决条件 #
这个演练假设您对 Python 有一定的了解,但即使您来自不同的编程语言,您也应该能够跟上。
帮助,我卡住了 #
如果你被卡住了,请查看社区支持资源。特别是 Apache Flink 的用户邮件列表,一直被认为是 Apache 项目中最活跃的一个,也是快速获得帮助的好方法。
如何跟进 #
如果你想跟上,你需要一台装有 Python 3 以及 Docker 的电脑。
注意:为了简洁起见,本演练中的每个代码块可能不包含完整的周边类。完整的代码可以在本页底部找到。
你可以通过点击这里下载一个包含骨架项目的 zip 文件。
解压包后,你会发现一些文件。这些文件包括 dockerfiles 和数据生成器,用于在本地自包含环境中运行此演练。
$ tree statefun-walkthrough
statefun-walkthrough
├── Dockerfile
├── docker-compose.yml
├── generator
│ ├── Dockerfile
│ ├── event-generator.py
│ └── messages_pb2.py
├── greeter
│ ├── Dockerfile
│ ├── greeter.py
│ ├── messages.proto
│ ├── messages_pb2.py
│ └── requirements.txt
└── module.yaml
从事件开始 #
Stateful Functions 是一个事件驱动的系统,所以开发从定义我们的事件开始。问候者应用程序将使用协议缓冲区定义其事件。当一个特定用户的问候请求被摄入时,它将被路由到相应的函数。响应将返回一个适当的问候。第三种类型,SeenCount,是一个实用类,后期将用于帮助管理用户到目前为止被看到的次数。
syntax = "proto3";
package example;
// External request sent by a user who wants to be greeted
message GreetRequest {
// The name of the user to greet
string name = 1;
}
// A customized response sent to the user
message GreetResponse {
// The name of the user being greeted
string name = 1;
// The users customized greeting
string greeting = 2;
}
// An internal message used to store state
message SeenCount {
// The number of times a users has been seen so far
int64 seen = 1;
}
我们的第一个函数 #
在底层,消息是使用有状态的函数来处理的,也就是任何绑定到 StatefulFunction
运行时的两个参数函数。函数用 @function.bind
装饰器绑定到运行时。当绑定一个函数时,它会被注解为一个函数类型。这是在向这个函数发送消息时用来引用它的名称。
当你打开文件 greeter/greeter.py
时,你应该看到以下代码。
from statefun import StatefulFunctions
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greet(context, greet_request):
pass
一个有状态函数需要两个参数,即上下文和消息。上下文提供了对有状态函数运行时功能的访问,如状态管理和消息传递。您将在本演练中探索其中的一些功能。
另一个参数是传递给这个函数的输入消息。默认情况下,消息是以 protobuf Any 的形式传递的。如果一个函数只接受一个已知的类型,你可以使用 Python 3 类型语法覆盖消息类型。这样您就不需要对消息进行拆包或检查类型。
from messages_pb2 import GreetRequest
from statefun import StatefulFunctions
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
pass
发送回复 #
有状态函数接受消息,也可以将消息发送出去。消息可以被发送到其他函数,以及外部系统(或出口)。
一个流行的外部系统是 Apache Kafka。第一步,让我们更新 greeter/greeter.py
中的函数,通过向 Kafka 主题发送问候语来响应每个输入。
from messages_pb2 import GreetRequest, GreetResponse
from statefun import StatefulFunctions
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
response = GreetResponse()
response.name = greet_request.name
response.greeting = "Hello {}".format(greet_request.name)
egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
context.pack_and_send_egress("example/greets", egress_message)
对于每条消息,都会构造一个响应,并发送到一个名为 greetings
的 Kafka 主题,该主题按名称分区。egress_message
被发送到一个名为 example/greets
的出口。这个标识符指向一个特定的 Kafka 集群,并在下面的部署中进行配置。
一个有状态的 Hello #
这是一个很好的开端,但并没有展现出有状态函数的真正威力 - 与状态一起工作。假设你想根据每个用户发送请求的次数,为他们生成个性化的响应。
def compute_greeting(name, seen):
"""
Compute a personalized greeting, based on the number of times this @name had been seen before.
"""
templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
if seen < len(templates):
greeting = templates[seen] % name
else:
greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
response = GreetResponse()
response.name = name
response.greeting = greeting
return response
为了"记住"多条问候信息,你需要将一个持久化的值域( seen_count
)关联到 Greet
函数。对于每个用户,函数现在可以跟踪他们被看到的次数。
@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
state = context.state('seen_count').unpack(SeenCount)
if not state:
state = SeenCount()
state.seen = 1
else:
state.seen += 1
context.state('seen_count').pack(state)
response = compute_greeting(greet_request.name, state.seen)
egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
context.pack_and_send_egress("example/greets", egress_message)
状态 seen_count
始终是当前名称的范围,因此它可以独立地跟踪每个用户。
连接在一起 #
有状态的 Function 应用程序使用 http 与 Apache Flink 运行时进行通信。Python SDK 提供了一个 RequestReplyHandler,它可以基于 RESTful HTTP POSTS 自动分配函数调用。RequestReplyHandler 可以使用任何 HTTP 框架暴露。
一个流行的 Python web 框架是 Flask。它可以用来快速、轻松地将应用程序暴露给 Apache Flink 运行时。
from statefun import StatefulFunctions
from statefun import RequestReplyHandler
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greeter(context, message: GreetRequest):
pass
handler = RequestReplyHandler(functions)
# Serve the endpoint
from flask import request
from flask import make_response
from flask import Flask
app = Flask(__name__)
@app.route('/statefun', methods=['POST'])
def handle():
response_data = handler(request.data)
response = make_response(response_data)
response.headers.set('Content-Type', 'application/octet-stream')
return response
if __name__ == "__main__":
app.run()
配置运行时 #
有状态函数运行时通过向 Flask 服务器进行 http 调用来向 greeter
函数发出请求。要做到这一点,它需要知道它可以使用什么端点来到达服务器。这也是配置我们连接到输入和输出 Kafka 主题的好时机。配置在一个名为 module.yaml 的文件中。
version: "1.0"
module:
meta:
type: remote
spec:
functions:
- function:
meta:
kind: http
type: example/greeter
spec:
endpoint: http://python-worker:8000/statefun
states:
- seen_count
maxNumBatchRequests: 500
timeout: 2min
ingresses:
- ingress:
meta:
type: statefun.kafka.io/routable-protobuf-ingress
id: example/names
spec:
address: kafka-broker:9092
consumerGroupId: my-group-id
topics:
- topic: names
typeUrl: com.googleapis/example.GreetRequest
targets:
- example/greeter
egresses:
- egress:
meta:
type: statefun.kafka.io/generic-egress
id: example/greets
spec:
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
这个配置做了一些有趣的事情。
首先是声明我们的函数 example/greeter
。它包括它可以到达的端点以及函数可以访问的状态。
ingress
是将 GreetRequest
消息路由到函数的输入 Kafka 主题。除了 broker 地址和消费者组等基本属性,它还包含一个目标列表。这些是每个消息将被发送到的函数。
出口是输出的 Kafka 集群。它包含 broker 特定的配置,但允许每个消息路由到任何主题。
部署 #
现在已经构建了 greeter
应用程序,是时候部署了。部署 Stateful Function 应用程序最简单的方法是使用社区提供的基础映像并加载你的模块。基础镜像提供了 Stateful Function 运行时,它将使用提供的 module.yaml 来为这个特定的工作进行配置。这可以在根目录下的 Docker 文件中找到。
FROM flink-statefun:2.2.0
RUN mkdir -p /opt/statefun/modules/greeter
ADD module.yaml /opt/statefun/modules/greeter
现在您可以使用提供的 Docker 设置在本地运行此应用程序。
$ docker-compose up -d
那么,要想在行动中看到例子,就看看话题问候出来的内容。
docker-compose logs -f event-generator
想更进一步? #
这个 Greeter 永远不会忘记一个用户。试着修改这个函数,使它能够为任何没有与系统交互的用户花超过60秒的时间重置 seen_count
。
查看 Python SDK 页面以获得更多关于如何实现这一功能的信息。
完整应用 #
from messages_pb2 import SeenCount, GreetRequest, GreetResponse
from statefun import StatefulFunctions
from statefun import RequestReplyHandler
from statefun import kafka_egress_record
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
state = context.state('seen_count').unpack(SeenCount)
if not state:
state = SeenCount()
state.seen = 1
else:
state.seen += 1
context.state('seen_count').pack(state)
response = compute_greeting(greet_request.name, state.seen)
egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
context.pack_and_send_egress("example/greets", egress_message)
def compute_greeting(name, seen):
"""
Compute a personalized greeting, based on the number of times this @name had been seen before.
"""
templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
if seen < len(templates):
greeting = templates[seen] % name
else:
greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
response = GreetResponse()
response.name = name
response.greeting = greeting
return response
handler = RequestReplyHandler(functions)
#
# Serve the endpoint
#
from flask import request
from flask import make_response
from flask import Flask
app = Flask(__name__)
@app.route('/statefun', methods=['POST'])
def handle():
response_data = handler(request.data)
response = make_response(response_data)
response.headers.set('Content-Type', 'application/octet-stream')
return response
if __name__ == "__main__":
app.run()