侧输出
— 焉知非鱼Side Outputs
Side Output #
除了 DataStream 操作产生的主流(main stream)外,还可以产生任意数量的附加侧输出结果流。结果流中的数据类型不必与主流中的数据类型相匹配,不同侧输出的类型也可以不同。当您要分割数据流时,这种操作非常有用,通常您必须复制数据流,然后从每个数据流中过滤掉您不想要的数据。
在使用侧输出时,首先需要定义一个 OutputTag
,用来识别侧输出流。
val outputTag = OutputTag[String]("side-output")
请注意 OutputTag
是如何根据侧输出流所包含的元素类型进行类型化的。
可以通过以下函数向侧输出发送数据。
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
你可以使用 Context
参数(在上面的函数中暴露给用户)向一个由 OutputTag
标识的侧输出发送数据。下面是一个从 ProcessFunction
中发射侧输出数据的例子。
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")
val mainDataStream = input
.process(new ProcessFunction[Int, Int] {
override def processElement(
value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
// emit data to regular output
out.collect(value)
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value))
}
})
为了检索侧输出流,你可以在 DataStream 操作的结果上使用 getSideOutput(OutputTag)
。这将给你一个 DataStream,它的类型是侧输出流的结果。
val outputTag = OutputTag[String]("side-output")
val mainDataStream = ...
val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html