处理应用程序参数
— 焉知非鱼Handling Application Parameters
处理应用程序参数
几乎所有的 Flink 应用,包括批处理和流式应用,都依赖于外部配置参数,它们用于指定输入和输出源(如路径或地址)、系统参数(并行性、运行时配置)和应用特定参数(通常在用户函数中使用)。它们用于指定输入和输出源(如路径或地址)、系统参数(并行性、运行时配置)和应用程序特定参数(通常在用户函数中使用)。
Flink 提供了一个名为 ParameterTool 的简单工具,为解决这些问题提供一些基本的工具。请注意,你不一定要使用这里描述的 ParameterTool。其他框架如 Commons CLI和argparse4j 也能很好地与 Flink 一起工作。
将你的配置值导入 ParameterTool 之中
ParameterTool 提供了一组预定义的静态方法来读取配置。该工具内部期待的是一个 Map<String,String>
,所以很容易将其与自己的配置风格整合在一起。
从 .properties
文件中
下面的方法将读取一个属性文件并提供键/值对。
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
从命令行参数来看
这就允许从命令行中获取 --input hdfs://mydata --elements 42
这样的参数。
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
从系统属性
当启动 JVM 时,你可以将系统属性传递给它。-Dinput=hdfs://mydata
。你也可以从这些系统属性中初始化 ParameterTool。
ParameterTool parameter = ParameterTool.fromSystemProperties();
在 Flink 程序中使用参数
现在我们已经从某个地方得到了参数(见上文),我们可以以各种方式使用它们。
直接从 ParameterTool 中使用
ParameterTool 本身有访问值的方法。
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
你可以在客户端提交应用程序的 main()
方法中直接使用这些方法的返回值。例如,你可以这样设置一个操作符的并行性。
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
由于 ParameterTool 是可序列化的,所以你可以把它传递给函数本身。
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
然后在函数内部使用它从命令行获取值。
全局注册参数
在 ExecutionConfig 中注册为全局作业参数的参数可以作为配置值从 JobManager Web 界面和用户定义的所有功能中访问。
全局注册参数。
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在任何丰富的用户功能中访问它们。
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/application_parameters.html