Sdk
— 焉知非鱼Sdk
SDK #
有状态函数应用程序由一个或多个模块组成。一个模块是一个由运行时加载的函数捆绑,并提供给消息。来自所有加载模块的函数都是多路复用的,并且可以自由地相互发送消息。
有状态函数支持两种类型的模块。远程模块和嵌入式模块。
远程模块 #
远程模块作为 Apache Flink® 运行时的外部进程运行;在同一容器中,作为 sidecar,使用无服务器平台或其他外部位置。这种模块类型可以支持任何数量的语言 SDK。远程模块通过 YAML 配置文件在系统中注册。
技术指标 #
一个远程模块配置由一个元部分和一个规范部分组成。meta
包含了模块的辅助信息;而 spec
则描述了模块中包含的功能并定义了它们的持久值。
定义函数 #
module.spec.functions 声明了一个由远程模块实现的函数对象列表。一个函数通过一些属性来描述。
- function.meta.kind
- 用于与远程功能通信的协议。
- 所支持的值 - http
- function.meta.type
- 函数类型那个, 被定义为
<namespace>/<name>
。
- 函数类型那个, 被定义为
- function.spec.endpoint
- 函数可到达的端点。
- 所支持的 schemes: http, https.
- 使用 http+unix 或 https+unix 方案支持通过 UNIX 域套接字进行传输。
- 当使用 UNIX 域套接字时,端点格式是:
http+unix://<socket-file-path>/<serve-url-path>
。例如,http+unix:///uds.sock/path/of/url
。
- function.spec.states
- 在远程函数中声明的持久化值的列表
- 每个条目由
name
属性和可选的expireAfter
属性组成。 - expireAfter 的默认值为 0,表示状态过期被禁用。
- function.spec.maxNumBatchRequests
- 在调用系统背压之前,一个函数可以处理的特定地址的最大记录数。
- 默认值:1000
- function.spec.timeout
- 运行时在失败前等待远程函数返回的最长时间。这涵盖了整个调用过程,包括连接到函数端点、编写请求、函数处理和读取响应。
- 默认值:1分钟
- function.spec.connectTimeout
- 运行时等待连接到远程函数端点的最长时间。
- 默认值:10秒。
- function.spec.readTimeout
- 运行时等待单个读IO操作的最大时间,如读取调用响应。
- 默认值:10秒。
- function.spec.writeTimeout
- 运行时等待单个写IO操作的最大时间,比如写调用请求。
- 默认值:10秒。
完整示例 #
version: "2.0"
module:
meta:
type: remote
spec:
functions:
- function:
meta:
kind: http
type: example/greeter
spec:
endpoint: http://<host-name>/statefun
states:
- name: seen_count
expireAfter: 5min
maxNumBatchRequests: 500
timeout: 2min
嵌入式模块 #
嵌入式模块与 Apache Flink® 运行时共存,并嵌入其中。
这种模块类型只支持基于 JVM 的语言,并通过实现 StatefulFunctionModule 接口来定义。嵌入模块提供了一个单一的配置方法,有状态的函数根据其函数类型与系统绑定。运行时配置可以通过 globalConfiguration 来实现,它是应用程序 flink-conf.yaml 中前缀 statefun.module.global-config 下的所有配置以及以 --key value
形式传递的任何命令行参数的联合。
package org.apache.flink.statefun.docs;
import java.util.Map;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
public class BasicFunctionModule implements StatefulFunctionModule {
public void configure(Map<String, String> globalConfiguration, Binder binder) {
// Declare the user function and bind it to its type
binder.bindFunctionProvider(FnWithDependency.TYPE, new CustomProvider());
// Stateful functions that do not require any configuration
// can declare their provider using java 8 lambda syntax
binder.bindFunctionProvider(Identifiers.HELLO_TYPE, unused -> new FnHelloWorld());
}
}
嵌入式模块利用 Java 的服务提供者接口(SPI)进行发现。这意味着每个 JAR 都应该在 META_INF/services
资源目录下包含一个文件 org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
,该文件列出了它提供的所有可用模块。
org.apache.flink.statefun.docs.BasicFunctionModule