Flink Datastream API 编程指南
— 焉知非鱼Flink Datastream API Programming Guide
Flink DataStream API 编程指南 #
Flink 中的 DataStream 程序是对数据流实现转换的常规程序(如过滤、更新状态、定义窗口、聚合)。数据流最初是由各种源(如消息队列、套接字流、文件)创建的。结果通过接收器(sink)返回,例如可以将数据写入文件,或标准输出(例如命令行终端)。Flink 程序可以在各种环境下运行,独立运行,或者嵌入到其他程序中。执行可以发生在本地 JVM 中,也可以发生在许多机器的集群中。
为了创建你自己的 Flink DataStream 程序,我们鼓励你从一个 Flink 程序的骨架开始,并逐步添加你自己的流转换。其余部分作为额外操作和高级功能的参考。
什么是 DataStream? #
DataStream API 的名字来自于特殊的 DataStream
类,它用于表示 Flink 程序中的数据集合。你可以把它们看作是不可改变的数据集合,可以包含重复的数据。这些数据既可以是有限的,也可以是无边界的,你用来处理它们的 API 是一样的。
DataStream 在用法上与普通的 Java Collection 类似,但在一些关键方面却有很大不同。它们是不可改变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地检查里面的元素,而只能使用 DataStream API 操作对它们进行操作,这也被称为转换。
你可以通过在 Flink 程序中添加一个源来创建一个初始的 DataStream。然后你可以从中派生新的流,并通过使用 API 方法,如 map
、filter
等来组合它们。
Flink 程序的骨架 #
Flink 程序看起来就像转换 DataStream 的普通程序。每个程序由相同的基本部分组成。
- 获取一个执行环境
- 加载/创建初始数据。
- 指定该数据的转换。
- 指定计算结果的位置。
- 触发程序执行
现在我们将对其中的每一个步骤进行概述,更多细节请参考相关章节。注意,Scala DataStream API 的所有核心类都可以在 org.apache.flink.stream.api.scala 中找到。
StreamExecutionEnvironment
是所有 Flink 程序的基础。你可以使用 StreamExecutionEnvironment
上的这些静态方法获得一个。
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常情况下,你只需要使用 getExecutionEnvironment()
,因为这将根据上下文做正确的事情:如果你在 IDE 里面执行你的程序,或者作为一个普通的 Java 程序,它将创建一个本地环境,在你的本地机器上执行你的程序。如果你从你的程序中创建了一个 JAR 文件,并通过命令行调用它,Flink 集群管理器将执行你的主方法,并且 getExecutionEnvironment()
将返回一个在集群上执行你的程序的执行环境。
对于指定数据源,执行环境有几种方法可以使用不同的方法从文件中读取数据:你可以只是逐行读取,作为 CSV 文件,或者使用任何其他提供的数据源。如果只是将文本文件作为一个行的序列来读取,你可以使用。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这将为您提供一个 DataStream,然后您可以在其上应用转换来创建新的派生 DataStream。
你可以通过调用 DataStream 上的方法和转换函数来应用转换。例如,一个 map
转换看起来像这样。
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
这将通过将原始集合中的每一个字符串转换为一个 Integer 来创建一个新的 DataStream。
一旦你有了一个包含最终结果的 DataStream,你就可以通过创建一个接收器(sink)将其写入外部系统。这些只是创建接收器的一些示例方法。
writeAsText(path: String)
print()
一旦你指定了完整的程序,你需要通过调用 StreamExecutionEnvironment
上的 execution()
来触发程序的执行。根据 ExecutionEnvironment
的类型,将在你的本地机器上触发执行,或者将你的程序提交到集群上执行。
execute()
方法将等待作业完成,然后返回一个 JobExecutionResult
,这个包含执行时间和累加器结果。
如果你不想等待作业完成,你可以在 StreamExecutionEnvironment
上调用 executeAysnc()
来触发异步作业执行。它将返回一个 JobClient
,你可以用它与刚刚提交的作业进行通信。例如,下面是如何通过使用 executeAsync()
来实现 execute()
的语义。
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
最后这部分关于程序执行的内容对于理解 Flink 操作何时以及如何执行至关重要。所有的 Flink 程序都是懒惰地执行的。当程序的主方法被执行时,数据加载和转换不会直接发生。相反,每个操作都被创建并添加到一个数据流图(dataflow graph)中。当执行环境上的 execute()
调用明确触发执行时,这些操作才会被实际执行。程序是在本地执行还是在集群上执行,取决于执行环境的类型
惰性求值可以让您构建复杂的程序,Flink 作为一个整体规划的单元来执行。
示例程序 #
下面的程序是一个完整的,工作的流媒体窗口单词计数应用程序的例子,它可以在5秒的窗口中计算来自 Web Socket 的单词。你可以复制和粘贴代码在本地运行它。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
To run the example program, start the input stream with netcat first from a terminal:
nc -lk 9999
只需输入一些单词,按回车键输入一个新单词。这些词将被输入到单词计数程序中。如果你想看到大于1的计数,请在5秒内反复输入同一个单词(如果你打字没那么快,请从5秒开始增加窗口大小☺)。
数据源 #
源是你的程序读取其输入的地方。你可以通过使用 StreamExecutionEnvironment.addSource(sourceFunction)
将一个源附加到你的程序中。Flink 提供了许多预先实现的 SourceFunction
,但是你可以通过实现非并行源的 SourceFunction
,或者实现并行源的 ParallelSourceFunction
接口或扩展 RichParallelSourceFunction
来编写自己的自定义源。
有几种预定义的流源(stream sources)可以从 StreamExecutionEnvironment
中访问。
基于文件的。
-
readTextFile(path)
- 逐行读取文本文件,即遵循TextInputFormat
规范的文件,并将其作为字符串返回。 -
readFile(fileInputFormat, path)
- 根据指定的文件输入格式读取(一次)文件。 -
readFile(fileInputFormat, path, watchType, interval, pathFilter)
- 这是前面两个方法内部调用的方法。它根据给定的fileInputFormat
读取路径中的文件。根据所提供的watchType
,这个源可能会周期性地监视(每隔 interval 毫秒)路径中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY
),或者处理一次当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE
)。使用pathFilter
,用户可以进一步排除被处理的文件。
实现:
在底层下,Flink 将文件读取过程分成两个子任务(sub-tasks),即目录监控和数据读取。这些子任务中的每一个都是由一个单独的实体实现的。监控由一个单一的、非并行(并行度=1)的任务实现,而读取则由多个任务(task)并行运行。后者的并行度等于作业的并行度(job parallelism)。单个监控任务的作用是扫描目录(根据 watchType
的不同,定期或只扫描一次),找到要处理的文件,将其分割,并将这些分割的文件分配给下游的读取器。读取器是那些将读取实际数据的东西。每个分片只能由一个读取器读取,而一个读取器可以读取多个分片,一个接一个。
重要提示:
-
如果
watchType
被设置为FileProcessingMode.PROCESS_CONTINUOUSLY
,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破"精确地一次"(exactly-once)的语义,因为在文件末尾追加数据会导致其所有内容被重新处理。 -
如果
watchType
被设置为FileProcessingMode.PROCESS_ONCE
,那么源就会对路径扫描一次并退出,而不会等待读取器完成对文件内容的读取。当然,读取器会继续读取,直到读取完所有文件内容。关闭源会导致在这之后不再有检查点。这可能会导致节点故障后的恢复速度变慢,因为作业(job)将从最后一个检查点开始恢复读取。
基于 Socket 的:
socketTextStream
- 从套接字读取。元素可以用定界符分开。
基于集合的:
-
fromCollection(Seq)
- 从Java Java.util.Collection
中创建数据流。集合中的所有元素必须是相同的类型。 -
fromCollection(Iterator)
- 从迭代器中创建一个数据流。该类指定迭代器返回的元素的数据类型。 -
fromElements(elements: _*)
- 从给定的对象序列中创建一个数据流。所有对象必须是相同的类型。 -
fromParallelCollection(SplittableIterator)
- 从迭代器中并行创建数据流。该类指定了迭代器返回的元素的数据类型。 -
generateSequence(from, to)
- 在给定的区间内并行生成数字序列。
自定义的:
addSource
- 附加一个新的源函数。例如,要从 Apache Kafka 读取数据,你可以使用addSource(new FlinkKafkaConsumer010<>(...))
。更多细节请参见连接器。
数据流转换 #
请参阅 operators 以了解可用的流转换的概述。
数据接收器 #
数据接收器消耗 DataStream,并将其转发到文件、套接字、外部系统或打印。Flink 带有各种内置的输出格式,这些格式被封装在 DataStream 的操作后面。
-
writeAsText()
/TextOutputFormat
- 将元素逐行写入字符串。这些字符串是通过调用每个元素的toString()
方法获得的。 -
writeAsCsv(...)
/CsvOutputFormat
- 将元组写成逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的toString()
方法。 -
print()
/printToErr()
- 将每个元素的toString()
值打印在标准输出/标准错误流上。可以选择提供一个前缀(msg),这个前缀被添加到输出中。这可以帮助区分不同的print
调用。如果并行度大于1,输出也将被预置为产生输出的任务(task)的标识符。 -
writeUsingOutputFormat()
/FileOutputFormat
- 用于自定义文件输出的方法和基类。支持自定义对象到字节的转换。 -
writeToSocket
- 根据SerializationSchema
将元素写入 socket。 -
addSink - 调用一个自定义的 sink 函数。Flink 捆绑了连接其他系统(如 Apache Kafka)的连接器,这些连接器被实现为 sink 函数。
请注意,DataStream 上的 write*()
方法主要是为了调试的目的。它们不参与 Flink 的检查点,这意味着这些函数通常具有最多一次(at-least-once)的语义。数据冲洗到目标系统取决于 OutputFormat
的实现。这意味着并非所有发送到 OutputFormat
的元素都会立即在目标系统中显示出来。另外,在失败的情况下,这些记录可能会丢失。
为了可靠地、精确地一次性将流传送到文件系统中,请使用 flink-connector-filesystem
。此外,通过 .addSink(...)
方法的自定义实现可以参与 Flink 的检查点,以实现精确的一次语义。
迭代 #
迭代流程序实现了一个步骤函数,并将其嵌入到 IterativeStream
中。由于 DataStream 程序可能永远不会结束,所以没有最大的迭代次数。相反,你需要指定流的哪一部分被馈入到迭代中,哪一部分使用 split
转换或 filter
转发到下游。在这里,我们展示了一个迭代的例子,其中主体(重复计算的部分)是一个简单的 map
转换,而反馈回来的元素是通过使用 filter
转发到下游的元素来区分的。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})
例如,这里的程序是从一系列整数中连续减去1,直到它们达到零。
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
执行参数 #
StreamExecutionEnvironment
包含了 ExecutionConfig
,它允许为运行时设置作业特定(job specific)的配置值。
请参考执行配置,了解大多数参数的解释。这些参数专门与 DataStream API 有关。
setAutoWatermarkInterval(long milliseconds)
: 设置自动发射水印的时间间隔。你可以通过long getAutoWatermarkInterval()
来获取当前值。
容错 #
状态和检查点介绍了如何启用和配置 Flink 的检查点机制。
控制延迟 #
默认情况下,元素不会在网络上逐一传输(会造成不必要的网络流量),而是被缓冲。缓冲区(实际在机器之间传输)的大小可以在 Flink 配置文件中设置。虽然这种方法有利于优化吞吐量,但当传入的数据流速度不够快时,会造成延迟问题。为了控制吞吐量和延迟,你可以在执行环境上(或者在单个 operator 上)使用 env.setBufferTimeout(timeoutMillis)
来设置缓冲区填满的最大等待时间。过了这个时间,即使缓冲区没有满,也会自动发送。该超时的默认值为 100 ms。
使用方法:
val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
为了最大限度地提高吞吐量,设置 setBufferTimeout(-1)
,这将消除超时,缓冲区只有在满时才会被刷新。为了最大限度地减少延迟,将超时设置为接近0的值(例如5或10毫秒)。应该避免缓冲区超时为0,因为它会导致严重的性能下降。
调试 #
在分布式集群中运行一个流程序之前,最好先确保实现的算法能够按照预期的方式运行。因此,实现数据分析程序通常是一个检查结果、调试和改进的渐进过程。
Flink 提供了一些功能,通过支持 IDE 内的本地调试、测试数据的注入和结果数据的收集,大大简化了数据分析程序的开发过程。本节给出一些提示,如何简化 Flink 程序的开发。
本地执行环境 #
LocalStreamEnvironment
在它创建的同一个 JVM 进程中启动 Flink 系统。如果你从 IDE 中启动 LocalEnvironment
,你可以在代码中设置断点,轻松调试你的程序。
LocalEnvironment
的创建和使用方法如下。
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
收集数据源 #
Flink 提供了特殊的数据源,这些数据源由 Java 集合支持,以方便测试。一旦程序被测试,源和接收器就可以很容易地被从外部系统读取/写入的源和接收器所替代。
集合数据源的使用方法如下。
val env = StreamExecutionEnvironment.createLocalEnvironment()
// 从元素列表中创建一个 DataStream
val myInts = env.fromElements(1, 2, 3, 4, 5)
// 从任何集合中创建一个 DataStream
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// 从迭代器中创建一个 DataStream
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注:目前,集合数据源要求数据类型和迭代器实现 Serializable
。此外,集合数据源不能并行执行( parallelism = 1)。
迭代器数据接收器 #
Flink 还提供了一个收集 DataStream 结果的接收器(sink),用于测试和调试目的。它的使用方法如下。
import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala
注意:flink-streaming-contrib
模块从 Flink 1.5.0 中移除。它的类被移到 flink-streaming-java
和 flink-streaming-scala
中。