Operators
— 焉知非鱼Operators
操作符 #
操作符将一个或多个 DataStream 转换为一个新的 DataStream。程序可以将多个变换组合成复杂的数据流拓扑。
本节给出了基本变换的描述,应用这些变换后的有效物理分区,以及对 Flink 的操作符链的见解。
DataStream 转换 #
- Map
DataStream → DataStream
接受一个元素并产生一个元素。一个将输入流的值翻倍的 map
函数:
dataStream.map { x => x * 2 }
- FlatMap
DataStream → DataStream
接受一个元素并产生零个、一个或多个元素。一个将句子分割成单词的 flatMap
函数:
dataStream.flatMap { str => str.split(" ") }
- Filter
DataStream → DataStream
评估每个元素的布尔函数,并保留那些函数返回值为真的元素。一个过滤掉零值的 filter
:
dataStream.filter { _ != 0 }
- KeyBy
DataStream → KeyedStream
在逻辑上将一个流划分为互斥的分区,每个分区包含相同键的元素。在内部,这是通过哈希分区实现的。关于如何指定键,请参见 keys。这个转换会返回一个 KeyedStream
:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
- Reduce
KeyedStream → DataStream
在 keyed 数据流上进行"滚动"换算(reduce)。将当前元素与最后一个换算的值合并,并发出新的值。
一个创建部分和(sum)流的 reduce
函数:
keyedStream.reduce { _ + _ }
- Fold
KeyedStream → DataStream
在一个带有初始值的 keyed 数据流上进行"滚动"折叠。将当前元素与最后一个折叠的值结合起来,并发出新的值。
一个折叠函数,当应用于序列(1,2,3,4,5)时,发出序列 “start-1”、“start-1-2”、“start-1-2-3”、…
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
- Aggregations
KeyedStream → DataStream
在 keyed 数据流上进行滚动聚合。min
和 minBy
的区别在于 min
返回最小值,而 minBy
则返回该字段中具有最小值的元素(max
和 maxBy
也一样)。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
- Window
KeyedStream → WindowedStream
可以在已经分区的 KeyedStream
上定义 Window
。窗口根据一些特征(例如,最近5秒内到达的数据)对每个键中的数据进行分组。关于窗口的描述,请参见窗口。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
- WindowAll
DataStream → AllWindowedStream
可以在常规的 DataStream 上定义窗口。窗口根据一些特征(例如,在过去5秒内到达的数据)对所有流事件进行分组。关于窗口的完整描述,请参见窗口。
警告:在许多情况下,这是一个非并行的转换。所有的记录将被收集在 windowAll
操作符的一个任务(task)中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
- Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将一般函数应用于整个窗口。下面是一个手动求和窗口元素的函数。
注意:如果您使用的是 windowAll
转换,您需要使用 AllWindowFunction
来代替。
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
- Window Reduce
WindowedStream → DataStream
对窗口应用函数式的 reduce
函数,并返回换算后的值:
windowedStream.reduce { _ + _ }
- Window Fold
WindowedStream → DataStream
对窗口应用功能 fold
函数并返回折叠后的值。示例函数应用于序列 (1,2,3,4,5)
时,将序列折叠成字符串 “start-1-2-3-4-5”:
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
- 窗口上的聚合
WindowedStream → DataStream
聚合一个窗口的内容。min
和 minBy
的区别在于 min
返回最小值,而 minBy
返回在该字段中具有最小值的元素(max
和 maxBy
相同)。
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
- Union
DataStream* → DataStream
联合两个或多个数据流,创建一个新的流,包含所有流的所有元素。注意:如果你把一个数据流和它自己联合起来,你将在生成的数据流中得到每个元素两次。
dataStream.union(otherStream1, otherStream2, ...)
- Window Join
DataStream,DataStream → DataStream
在一个给定的键和一个公共窗口上连接(join)两个数据流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
- Window CoGroup
DataStream,DataStream → DataStream
在一个给定的键和一个共同的窗口上将两个数据流串联(Cogroups)起来。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
- Connect
DataStream,DataStream → ConnectedStreams
“连接”(connect)两个数据流,保留其类型,允许两个数据流之间共享状态。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
- CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接(connected)数据流上的 map
和 flatMap
:
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
- Split
DataStream → SplitStream
根据某种标准,将流分成两个或多个流。
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
- Select
SplitStream → DataStream
从分割流中选择一个或多个流。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
- Iterate
DataStream → IterativeStream → DataStream
在流(flow)中创建一个"反馈"循环,将一个操作符的输出重定向到之前的某个操作符。这对于定义持续更新模型的算法特别有用。下面的代码从一个流(stream)开始,连续应用迭代体。大于0的元素被送回反馈通道,其余元素被转发到下游。参见迭代的完整描述。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
通过匿名模式匹配从 tuple、case 类和集合中提取,比如下面:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
}
不受 API 开箱即用的支持。要使用这个功能,你应该使用 Scala API 扩展。
以下转换可用于 Tuples 的数据流:
- Project
DataStream → DataStream
从元组中选择一个字段的子集。
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
物理分区 #
Flink 还可以通过以下函数对转换后的准确流分区进行低级控制(如果需要)。
- 自定义分区
DataStream → DataStream
使用用户定义的 Partitioner 为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
- 随机分区
DataStream → DataStream
将元素按照均匀分布随机分区。
dataStream.shuffle()
- Rebalancing (循环分区)
DataStream → DataStream
对元素进行循环分区,使每个分区的负载相等。在数据倾斜的情况下,对性能优化很有用。
dataStream.rebalance()
- Rescaling
DataStream → DataStream
将元素,轮回分区到下游操作的子集。如果你想拥有管道,例如,从源的每个并行实例向几个映射器(mappers)的子集扇出,以分配负载,但又不想进行 rebalance()
会引起的完全再平衡,那么这就很有用。这将只需要本地数据传输,而不是通过网络传输数据,这取决于其他配置值,如 TaskManagers 的槽数(slots)。
上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行程度。例如,如果上游操作的并行度为2,下游操作的并行度为4,那么一个上游操作将向两个下游操作分发元素,而另一个上游操作将向另外两个下游操作分发。另一方面,如果下游操作具有并行度2,而上游操作具有并行度4,那么两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。
在不同的并行度不是彼此的倍数的情况下,一个或几个下游操作将从上游操作中获得不同数量的输入。
请看此图,可以直观地看到上例中的连接(connection)模式。
dataStream.rescale()
- Broadcasting
DataStream → DataStream
将元素广播到每个分区。
dataStream.broadcast()
任务链和资源组 #
链式的两个后续变换意味着将它们共同放置在同一个线程中以获得更好的性能。如果可能的话,Flink 默认会将操作符链起来(例如,两个后续的 map 变换)。如果需要的话,API 提供了对链式操作的精细控制。
如果你想在整个作业(job)中禁用链,请使用 StreamExecutionEnvironment.disableOperatorChaining()
。对于更细粒度的控制,以下函数是可用的。请注意,这些函数只能在 DataStream 转换之后使用,因为它们引用了之前的转换。例如,你可以使用 someStream.map(...).startNewChain()
,但你不能使用 someStream.startNewChain()
。
资源组是 Flink 中的一个槽,参见 slots。如果需要,你可以在单独的槽中手动隔离操作符。
- Start new chain
开始一个新的链,从这个操作符开始。两个映射器(mappers)将被连锁,filter
将不会被连锁到第一个映射器(mapper)。
someStream.filter(...).map(...).startNewChain().map(...)
- Disable chaining
不将 map
运算符连锁化。
someStream.map(...).disableChaining()
- Set slot sharing group
设置操作的槽位共享组。Flink 会将具有相同槽位共享组的操作放入同一个槽位,而将没有槽位共享组的操作保留在其他槽位。这可以用来隔离槽位。如果所有的输入操作都在同一个槽共享组中,槽共享组就会从输入操作中继承。缺省槽共享组的名称是 “default”,操作可以通过调用 slotSharingGroup("default")
来明确地放入这个组。
someStream.filter(...).slotSharingGroup("name")
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/