rakulang, dartlang, nimlang, golang, rustlang, lang lang no see

本地执行

焉知非鱼

Local Execution

本地执行 #

Flink 可以在一台机器上运行,甚至在一台 Java 虚拟机中运行。这使得用户可以在本地测试和调试 Flink 程序。本节将对本地执行机制进行概述。

本地环境和执行器允许你在本地 Java 虚拟机中运行 Flink 程序,或者作为现有程序的一部分在任何 JVM 中运行。大多数例子可以通过简单地点击 IDE 的"运行"按钮在本地启动。

Flink 中支持两种不同的本地执行。LocalExecutionEnvironment 是启动完整的 Flink 运行时,包括一个 JobManager 和一个 TaskManager。这些包括内存管理和所有在集群模式下执行的内部算法。

CollectionEnvironment 是在 Java 集合上执行 Flink 程序。这种模式不会启动完整的 Flink 运行时,所以执行的开销非常低,而且是轻量级的。例如,一个 DataSet.map() 转换将通过将 map() 函数应用于 Java 列表中的所有元素来执行。

调试 #

如果你在本地运行 Flink 程序,你也可以像其他 Java 程序一样调试你的程序。你可以使用 System.out.println() 来写出一些内部变量,也可以使用调试器。可以在 map()reduce() 和其他所有方法中设置断点。也请参考 Java API 文档中的调试部分,了解 Java API 中的测试和本地调试工具的指南。

Maven 依赖 #

如果你是在 Maven 项目中开发程序,你必须使用这个依赖关系添加 flink-clients 模块。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>1.11.0</version>
</dependency>

本地环境 #

LocalEnvironment 是 Flink 程序本地执行的一个句柄。使用它可以在本地 JVM 中运行程序–独立或嵌入其他程序中。

本地环境是通过 ExecutionEnvironment.createLocalEnvironment() 方法实例化的。默认情况下,它将使用与你的机器有多少 CPU 核(硬件上下文)一样多的本地线程来执行。您也可以指定所需的并行度。本地环境可以配置为使用 enableLogging()/disableLogging() 将日志记录到控制台。

在大多数情况下,调用 ExecutionEnvironment.getExecutionEnvironment() 是更好的方法。当程序在本地(命令行接口之外)启动时,该方法会返回一个 LocalEnvironment,当程序被命令行接口调用时,该方法会返回一个预配置的集群执行环境。

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

    DataSet<String> data = env.readTextFile("file:///path/to/file");

    data
        .filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("file:///path/to/result");

    JobExecutionResult res = env.execute();
}

在执行结束后返回的 JobExecutionResult 对象,包含了程序运行时间和累加器结果。

LocalEnvironment 还允许向 Flink 传递自定义配置值。

Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

注意:本地执行环境不启动任何 Web 前端来监控执行。

收集环境 #

使用 CollectionEnvironment 在 Java 集合上执行是一种执行 Flink 程序的低开销方法。这种模式的典型用例是自动测试、调试和代码重用。

用户可以使用为批处理而实现的算法,也可以用于交互性更强的情况。Flink 程序的一个稍微改变的变体可以用于 Java 应用服务器中处理传入的请求。

基于集合执行的骨架:

public static void main(String[] args) throws Exception {
    // initialize a new Collection-based execution environment
    final ExecutionEnvironment env = new CollectionEnvironment();

    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

    /* Data Set transformations ... */

    // retrieve the resulting Tuple2 elements into a ArrayList.
    Collection<...> result = new ArrayList<...>();
    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

    // kick off execution.
    env.execute();

    // Do some work with the resulting ArrayList (=Collection).
    for(... t : result) {
        System.err.println("Result = "+t);
    }
}

flink-examples-batch 模块包含一个完整的例子,叫做 CollectionExecutionExample。

请注意,基于集合的 Flink 程序的执行只可能在小数据上执行,小数据适合 JVM 堆。在集合上的执行不是多线程的,只使用一个线程。

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/local_execution.html