窗口
— 焉知非鱼Windows
窗口
窗口是处理无限流的核心。窗口将流分割成有限大小的"桶",我们可以对其应用计算。本文档主要介绍 Flink 中如何进行窗口化,以及程序员如何从其提供的功能中最大限度地受益。
下面介绍了一个窗口化 Flink 程序的一般结构。第一个片段指的是 keyed 流,而第二个片段指的是 non-keyed 流。正如人们所看到的那样,唯一的区别是 keyed 流的 keyBy(...)
调用和 non-keyed 流的 window(...)
变成了 windowAll(...)
。这也将作为本页面其他内容的路线图。
Keyed 窗口
stream
.keyBy(...) <- keyed 与 non-keyed 窗口的对比
.window(...) <- 必须的: "assigner"
[.trigger(...)] <- 可选的: "trigger" (否则使用默认的 trigger)
[.evictor(...)] <- 可选的: "evictor" (否则没有 evictor)
[.allowedLateness(...)] <- 可选的: "lateness" (否则为零)
[.sideOutputLateData(...)] <- 可选的: "output tag" (否则迟到数据无侧输出)
.reduce/aggregate/fold/apply() <- 必须的: "function"
[.getSideOutput(...)] <- 可选的: "output tag"
Non-Keyed 窗口
stream
.windowAll(...) <- 必须的: "assigner"
[.trigger(...)] <- 可选的: "trigger" (否则使用默认的 trigger)
[.evictor(...)] <- 可选的: "evictor" (否则没有 evictor)
[.allowedLateness(...)] <- 可选的: "lateness" (否则为零)
[.sideOutputLateData(...)] <- 可选的: "output tag" (否则迟到数据无侧输出)
.reduce/aggregate/fold/apply() <- 必须的: "function"
[.getSideOutput(...)] <- 可选的: "output tag"
在上面,方括号中的命令([...]
)是可选的。这表明 Flink 允许你以多种不同的方式定制你的窗口逻辑,以便它最适合你的需求。
窗口生命周期 #
简而言之,当第一个应该属于这个窗口的元素到达时,就会创建一个窗口,当时间(事件时间或处理时间)经过(passes)它的结束时间戳加上用户指定的允许延迟时,这个窗口就会被完全移除(见允许延迟)。Flink 只保证对基于时间的窗口进行移除,而不保证对其他类型的窗口,如全局窗口进行移除(见窗口分配器)。例如,基于事件-时间的窗口策略每5分钟创建一个非重叠(或翻滚)的窗口,并且允许的延迟为1分钟,当第一个具有时间戳的元素落入这个区间时,Flink 将为 12:00 和 12:05 之间的区间创建一个新的窗口,当水印通过 12:06 的时间戳时,它将删除它。
此外,每个窗口将有一个触发器(见触发器)和一个函数(ProcessWindowFunction、ReduceFunction、AggregateFunction或FoldFunction)(见窗口函数)。函数将包含要应用于窗口内容的计算,而触发器则指定了窗口被认为可以应用函数的条件。触发策略可能是"当窗口中的元素数量超过4时",或者"当水印经过窗口的末端时"。触发器还可以决定在创建和删除窗口之间的任何时间(any time between its creation and removal)清除窗口的内容。在这种情况下,清除只指窗口中的元素,而不是窗口元数据。这意味着新的数据仍然可以被添加到该窗口中。
除上述之外,您还可以指定一个 Evictor(见 Evictors),它将能够在触发器触发后以及在函数应用之前和/或之后从窗口中删除元素。
在下文中,我们将对上述每个组件进行更详细的介绍。我们先从上述代码段中必须的部分开始(参见 Keyed vs Non-Keyed 窗口、窗口分配器和窗口函数),然后再转向可选部分。
Keyed 与 Non-Keyed 窗口的对比 #
首先要指定的是您的流是否应该是 keyed 的。这必须在定义窗口之前完成。使用 keyBy(...)
将把您的无限流分割成逻辑 keyed 流。如果没有调用 keyBy(...)
,那么您的流就不是 keyed 流。
在 keyed 流的情况下,传入事件的任何属性都可以被用作键(更多细节在这里)。拥有一个 keyed 流将允许你的窗口计算由多个任务并行执行,因为每个逻辑 keyed 流可以独立于其他流进行处理。所有指向同一键的元素将被发送到同一个并行任务(task)。
在 non-keyed 流的情况下,您的原始流不会被分割成多个逻辑流,所有的窗口化逻辑将由一个任务(task)来执行,即并行度为1。
窗口分配器 #
在指定流是否是 keyed 流之后,下一步是定义窗口分配器。窗口分配器定义了如何将元素分配给窗口。这是通过在 window(...)
(对于 keyed 流)或 windowAll()
(对于 non-keyed 流)调用中指定您所选择的 WindowAssigner
来实现的。
WindowAssigner
负责将每个传入的元素分配给一个或多个窗口。Flink 为最常见的用例提供了预定义的窗口分配器,即滚动窗口、滑动窗口、会话窗口和全局窗口。您也可以通过扩展 WindowAssigner
类来实现自定义窗口分配器。所有内置的窗口分配器(除了全局窗口)都是基于时间将元素分配给窗口,时间可以是处理时间,也可以是事件时间。请查看我们关于事件时间的部分,了解处理时间和事件时间之间的区别,以及时间戳和水印是如何生成的。
基于时间的窗口有一个开始时间戳(包括)和结束时间戳(不包括),共同描述窗口的大小。在代码中,Flink 在处理基于时间的窗口时使用了 TimeWindow
,它有查询开始和结束时间戳的方法,还有一个额外的方法 maxTimestamp()
,可以返回给定窗口的最大允许时间戳。
在下文中,我们将展示 Flink 的预定义窗口分配器是如何工作的,以及如何在 DataStream 程序中使用它们。下图直观地展示了每个分配器的工作情况。紫色的圆圈代表流的元素,这些元素被某个键(在本例中是用户1、用户2和用户3)分割。x轴显示的是时间的进度。
滚动窗口 #
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口。滚动窗口有一个固定的大小,并且不重叠。例如,如果你指定了一个大小为5分钟的滚动窗口,那么当前的窗口将被评估,并且每隔5分钟就会启动一个新的窗口,如下图所示。
以下代码片段展示了如何使用滚动窗口。
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
时间间隔可以使用 Time.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
等中的一种来指定。
如最后一个例子所示,滚动窗口分配器还可以采用一个可选的偏移量(offset
)参数,用于改变窗口的对齐方式。例如,在没有偏移量的情况下,每小时的滚动窗口与纪元对齐,也就是说,你会得到诸如 1:00:00.000 - 1:59:59.999
,2:00:00.000 - 2:59:59.999
等窗口。如果你想改变这一点,你可以给出一个偏移量。例如,如果偏移量为15分钟,您将得到 1:15:00.000 - 2:14:59.999
,2:15:00.000 - 3:14:59.999
等。偏移量的一个重要用途是调整窗口到 UTC-0 以外的时区。例如,在中国,你必须指定一个 Time.hours(-8)
的偏移量。
滑动窗口 #
滑动窗口分配器将元素分配给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小(window size)参数配置。一个额外的窗口滑动(window slide)参数控制滑动窗口的启动频率。因此,如果滑动窗口的滑块小于窗口大小,滑动窗口可以重叠。在这种情况下,元素被分配到多个窗口。
例如,你可以有10分钟大小的窗口,滑动5分钟。这样,每隔5分钟就会有一个窗口,包含过去10分钟内到达的事件,如下图所示。
以下代码片段展示了如何使用滑动窗口。
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
时间间隔可以通过使用 Time.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
等中的一个来指定。
如上一个例子所示,滑动窗口分配器还可以采取一个可选的偏移量(offset
)参数,用于改变窗口的对齐方式。例如,在没有偏移量的情况下,每小时滑动30分钟的窗口与纪元对齐,也就是说,你将得到 1:00:00.000 - 1:59:59.999
,1:30:00.000 - 2:29:59.999
等窗口。如果你想改变这一点,你可以给出一个偏移量。例如,如果偏移量为15分钟,您将得到 1:15:00.000 - 2:14:59.999
,1:45:00.000 - 2:44:59.999
等。偏移量的一个重要用途是调整窗口到 UTC-0 以外的时区。例如,在中国,你必须指定一个 Time.hours(-8)
的偏移。
会话窗口 #
会话窗口分配器按活动的会话对元素进行分组。与滚动窗口和滑动窗口不同,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一定时间内没有接收到元素时,也就是在不活动的间隙发生时,会话窗口就会关闭。会话窗口分配器可以配置一个静态的会话间隙(session gap),也可以配置一个会话间隙提取函数,该函数定义了多长时间的不活动期。当这个时间段(period)到期(expires)时,当前会话关闭,后续元素被分配到一个新的会话窗口。
以下代码片段展示了如何使用会话窗口。
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
静态间隙可以通过使用 Time.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
等之一来指定。
动态间隙可以通过实现 SessionWindowTimeGapExtractor
接口来指定。
注意: 由于会话窗口没有固定的开始和结束,所以它们的评估方式与滚动和滑动窗口不同。在内部,会话窗口操作符为每个到达的记录创建一个新的窗口,如果它们彼此之间的距离比定义的间隙更近,就会将窗口合并在一起。为了能够合并,会话窗口操作符需要一个合并触发器和一个合并窗口函数,如 ReduceFunction、AggregateFunction 或 ProcessWindowFunction(FoldFunction 不能合并)。
全局窗口 #
全局窗口分配器将具有相同键的所有元素分配到同一个全局窗口。只有当你还指定了一个自定义触发器时,这种窗口方案才有用。否则,任何计算都不会被执行,因为全局窗口没有一个自然的终点,我们可以在那里处理聚集的元素。
下面的代码片段展示了如何使用全局窗口。
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
窗口函数 #
在定义了窗口分配器之后,我们需要指定我们要对这些窗口中的每一个窗口进行的计算。这是窗口函数的责任,一旦系统确定一个窗口准备好进行处理,它就会用来处理每个(可能是 keyed 的)窗口的元素(关于 Flink 如何确定窗口准备好,请参见触发器)。
窗口函数可以是 ReduceFunction
、AggregateFunction
、FoldFunction
或 ProcessWindowFunction
中的一种。前两个可以更有效地执行(见状态大小部分),因为 Flink 可以在每个窗口到达时增量地聚合元素。ProcessWindowFunction
可以为一个窗口中包含的所有元素获取一个 Iterable
,以及关于元素所属窗口的附加元信息。
带有 ProcessWindowFunction
的窗口化转换不能像其他情况一样高效执行,因为 Flink 在调用函数之前必须在内部缓冲一个窗口的所有元素。通过将 ProcessWindowFunction
与 ReduceFunction
、AggregateFunction
或 FoldFunction
结合起来,既可以得到窗口元素的增量聚合,也可以得到 ProcessWindowFunction
接收到的额外的窗口元数据,从而缓解这种情况。我们将查看这些变体的每个例子。
ReduceFunction #
ReduceFunction
指定了如何将输入的两个元素组合起来以产生相同类型的输出元素。Flink 使用 ReduceFunction
来增量聚合一个窗口的元素。
ReduceFunction
可以这样定义和使用。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
上面的例子把一个窗口中所有元素的元组的第二个字段相加起来。
AggregateFunction #
AggregateFunction
是 ReduceFunction
的通用版本,它有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中元素的类型,AggregateFunction 有一个方法用于将一个输入元素添加到累加器中。该接口还有创建一个初始累加器、将两个累加器合并成一个累加器以及从一个累加器中提取一个输出(类型为 OUT)的方法。我们将在下面的例子中看到这些方法是如何工作的。
和 ReduceFunction 一样,Flink 会在窗口的输入元素到达时,对它们进行增量聚合。
AggregateFunction 可以这样定义和使用。
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
上面的例子是计算窗口中元素的第二个字段的平均值。
FoldFunction #
FoldFunction 指定了窗口的输入元素如何与输出类型的元素相结合。对于添加到窗口的每个元素和当前的输出值,都会递增地调用 FoldFunction。第一个元素与输出类型的预定义初始值相结合。
可以这样定义和使用 FoldFunction。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
上面的例子将所有输入的 Long 值追加到一个初始的空字符串中。
注意 fold()
不能用于会话窗口或其他可合并窗口。
ProcessWindowFunction #
ProcessWindowFunction 得到一个包含窗口所有元素的 Iterable,以及一个可以访问时间和状态信息的 Context 对象,这使得它能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口被认为可以处理为止。
ProcessWindowFunction 的签名如下。
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W
/**
* Returns the current processing time.
*/
def currentProcessingTime: Long
/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}
}
注意 key
参数是通过为 keyBy()
调用指定的 KeySelector
提取的键。如果是元组索引键或字符串字段引用,这个键的类型总是 Tuple,你必须手动将其转换为一个正确大小的元组来提取键字段。
ProcessWindowFunction
可以这样定义和使用。
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
这个例子显示了一个 ProcessWindowFunction
,它可以计算一个窗口中的元素。此外,窗口函数还将窗口的信息添加到输出中。
注意,使用 ProcessWindowFunction 进行简单的聚合,如 count
,效率相当低。下一节将展示如何将 ReduceFunction
或 AggregateFunction
与 ProcessWindowFunction
结合起来,以获得增量聚合和 ProcessWindowFunction
的附加信息。
具有增量聚合功能的 ProcessWindowFunction #
ProcessWindowFunction
可以与 ReduceFunction
、AggregateFunction
或 FoldFunction
相结合,以在元素到达窗口时进行增量聚合。当窗口关闭时,ProcessWindowFunction
将被提供聚合的结果。这使得它可以增量计算窗口,同时可以访问 ProcessWindowFunction
的附加窗口元信息。
注意 您也可以使用 legacy WindowFunction 代替 ProcessWindowFunction 进行增量窗口聚合。
使用 ReduceFunction 进行增量窗口聚合 #
下面的例子展示了如何将增量 ReduceFunction 与 ProcessWindowFunction 相结合,以返回窗口中最小的事件以及窗口的开始时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((context.window.getStart, min))
}
)
用 AggregateFunction 进行增量窗口聚合 #
下面的例子展示了如何将增量的 AggregateFunction 与 ProcessWindowFunction 结合起来,计算平均值,同时将键和窗口与平均值一起发出。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {
val average = averages.iterator.next()
out.collect((key, average))
}
}
用 FoldFunction 进行增量窗口聚合 #
下面的例子展示了如何将增量式 FoldFunction 与 ProcessWindowFunction 相结合,以提取窗口中的事件数量,并返回窗口的键和结束时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
在 ProcessWindowFunction 中使用 per-窗口状态 #
除了访问 keyed 状态(任何富函数都可以),ProcessWindowFunction 还可以使用 keyed 状态,该状态的作用域是函数当前正在处理的窗口。在这种情况下,理解每个窗口状态所指的窗口是什么很重要。这里涉及到不同的"窗口"。
- 窗口是在指定窗口操作时定义的。这可能是1小时的滚动窗口或者2小时的滑动窗口,滑动1小时。
- 一个给定的键的定义窗口的实际实例。这可能是 12: 00 到 13: 00 的时间窗口,用户 ID xyz. 这是基于窗口定义的,会有很多窗口,基于作业当前正在处理的键的数量,基于事件属于什么时间段。
每个窗口的状态与这两者中的后一种挂钩。意思是说,如果我们处理了1000个不同键的事件,并且所有键的事件当前都属于 [12:00,13:00)
时间窗口,那么将有1000个窗口实例,每个窗口都有自己的键的per-窗口状态。
process()
调用接收到的 Context 对象上有两个方法允许访问这两种类型的状态。
globalState()
,允许访问不在窗口范围内的 keyed 状态。windowState()
,它允许访问同样作用于窗口的 keyed 状态。
如果你预计同一窗口会有多次发射,那么这个功能是很有帮助的,因为当你对晚到的数据有晚发射的情况,或者当你有一个自定义的触发器,做投机性的早期发射时,可能会发生这种情况。在这种情况下,你会在每个窗口状态下存储之前的发射信息或发射次数。
当使用窗口状态时,重要的是当窗口被清除时也要清理该状态。这应该发生在 clear()
方法中。
WindowFunction(Legacy) #
在一些可以使用 ProcessWindowFunction
的地方,你也可以使用 WindowFunction
。这是 ProcessWindowFunction
的旧版本,它提供的上下文信息较少,而且没有一些先进的功能,比如每个窗口的 keyed 状态。这个接口在某些时候会被废弃。
WindowFunction 的签名如下。
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
可以这样使用。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
触发器 #
触发器决定一个窗口(由窗口分配器形成)何时可以被窗口函数处理。每个 WindowAssigner
都有一个默认的触发器。如果默认的触发器不符合你的需求,你可以使用 trigger(...)
指定一个自定义的触发器。
触发器接口有五个方法,允许 Trigger 对不同的事件做出反应。
onElement()
方法对每个添加到窗口的元素都会被调用。onEventTime()
方法在注册的事件时间定时器启动时被调用。onProcessingTime()
方法在注册的处理时间计时器启动时被调用。onMerge()
方法与有状态的触发器相关,当两个触发器的对应窗口合并时,例如使用会话窗口时,就会合并两个触发器的状态。- 最后
clear()
方法在删除相应窗口时执行任何需要的操作。
关于以上方法有两点需要注意。
1)前三个方法通过返回一个 TriggerResult
来决定如何对其调用事件采取行动。动作可以是以下之一。
- CONTINUE:什么也不做。
- FIRE:触发计算。
- PURGE:清除窗口中的元素,以及
- FIRE_AND_PURGE:触发计算,之后清除窗口中的元素。
- 这些方法中的任何一种都可以用来注册处理时间或事件时间的定时器,以备将来的操作。
Fire 和 Purge #
一旦触发器确定一个窗口可以处理,它就会发射,即返回 FIRE 或 FIRE_AND_PURGE。这是窗口操作者发出当前窗口结果的信号。给定一个带有 ProcessWindowFunction 的窗口,所有的元素都会被传递给 ProcessWindowFunction(可能是在将它们传递给 evictor 之后)。带有 ReduceFunction、AggregateFunction 或 FoldFunction 的窗口只是简单地发出它们急切的聚合结果。
当一个触发器发射时,它可以是 FIRE 或 FIRE_AND_PURGE。FIRE 保留窗口的内容,而 FIRE_AND_PURGE 则删除其内容。默认情况下,预先实现的触发器只是 FIRE 而不清除窗口状态。
注意 Purging 将简单地删除窗口的内容,并将完整地保留任何关于窗口和任何触发状态的潜在元信息。
窗口分配器的默认触发器 #
WindowAssigner 的默认触发器适合于许多用例。例如,所有的事件时间窗口分配器都有一个 EventTimeTrigger 作为默认触发器。这个触发器仅仅是在水印通过窗口结束后就会触发。
注意:GlobalWindow 的默认触发器是 NeverTrigger,它永远不会触发。因此,在使用 GlobalWindow 时,您必须定义一个自定义的触发器。
注意:通过使用 trigger() 指定一个触发器,您将覆盖一个 WindowAssigner 的默认触发器。例如,如果你为 TumblingEventTimeWindows 指定了一个 CountTrigger,你将不再获得基于时间进度的窗口启动,而只能通过计数来获得。现在,如果你想同时基于时间和计数做出反应,你必须编写自己的自定义触发器。
内置和自定义触发器 #
Flink 内置了一些触发器。
- 前面已经提到过的, EventTimeTrigger 会根据水印测量的事件时间的进展而触发。
- 处理时间触发器(ProcessingTimeTrigger)基于处理时间而触发。
- CountTrigger 在一个窗口中的元素数量超过给定的限制时触发。
- PurgingTrigger 将另一个触发器作为参数,并将其转换为一个清洗触发器。
如果你需要实现一个自定义的触发器,你应该查看抽象的 Trigger 类。请注意,API 仍在不断发展,可能会在 Flink 的未来版本中改变。
Evictors #
Flink 的窗口模型允许在 WindowAssigner 和 Trigger 之外指定一个可选的 Evictor。这可以通过 evictor(...)
方法来完成(如本文开头所示)。Evictor 能够在触发器触发后和应用窗口函数之前和/或之后从窗口中移除元素。要做到这一点,Evictor 接口有两个方法。
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evictBefore()
包含在窗口函数之前应用的驱逐逻辑,而 evictAfter()
包含在窗口函数之后应用的逻辑。在应用窗口函数之前被驱逐的元素将不会被它处理。
Flink 自带了三个预先实现的驱逐器。这三个是:
- CountEvictor:从窗口中保留最多用户指定数量的元素,并从窗口缓冲区开始丢弃剩余的元素。
- DeltaEvictor:取 DeltaFunction 和阈值,计算窗口缓冲区中最后一个元素和剩余元素之间的 delta,并删除 delta 大于或等于阈值的元素。
- TimeEvictor:以毫秒为单位的时间间隔作为参数,对于一个给定的窗口,它在其元素中找到最大的时间戳 max_ts,并删除所有时间戳小于 max_ts - interval 的元素。
默认情况下,所有预先实现的 evictor 都会在 window 函数之前应用其逻辑。
注意: 指定一个 evictor 可以防止任何预聚集,因为一个窗口的所有元素都必须在应用计算之前传递给 evictor。
注意 Flink 不保证窗口内元素的顺序。这意味着,虽然 evictor 可以从窗口的开头移除元素,但这些元素不一定是最先或最后到达的。
允许的延迟 #
当使用事件时间窗口时,可能会发生元素迟到的情况,也就是说,Flink 用来跟踪事件时间进度的水印已经超过了元素所属窗口的结束时间戳。关于 Flink 如何处理事件时间,请参见事件时间,尤其是迟到元素。
默认情况下,当水印超过窗口的结束时间时,晚期元素就会被删除。然而,Flink 允许为窗口操作者指定一个最大允许延迟。允许延迟指定了元素在被丢弃之前可以迟到多少时间,其默认值为0。 在水印通过窗口结束后但在其通过窗口结束前加上允许延迟之前到达的元素,仍然会被添加到窗口中。根据所使用的触发器,一个迟到但未被丢弃的元素可能会导致窗口再次启动。EventTimeTrigger 就属于这种情况。
为了使这个工作,Flink 会保持窗口的状态,直到它们的允许延迟过期。一旦发生这种情况,Flink 就会删除窗口并删除其状态,这一点在窗口生命周期部分也有描述。
默认情况下,允许的延迟被设置为0,也就是说,到达水印后面的元素将被丢弃。
您可以像这样指定允许的延迟。
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
注意 当使用 GlobalWindows 窗口分配器时,由于全局窗口的结束时间戳是 Long.MAX_VALUE,因此没有数据被认为是迟到数据。
作为侧输出获取迟到数据 #
使用 Flink 的侧输出功能,你可以得到一个被丢弃的迟到数据流。
首先,你需要在窗口化的数据流上使用 sideOutputLateData(OutputTag)
来指定你要获取迟到的数据。然后,你就可以在窗口化操作的结果上得到侧输出流。
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
迟到元素的考虑 #
当指定允许的延迟大于0时,在水印通过窗口结束后,窗口及其内容将被保留。在这些情况下,当一个迟到但未被丢弃的元素到达时,它可能会触发窗口的另一次发射。这些发射被称为晚期发射,因为它们是由晚期事件触发的,与主发射相反,主发射是窗口的第一次发射。在会话窗口的情况下,迟发可能会进一步导致窗口的合并,因为它们可能会"弥合"两个已经存在的、未合并的窗口之间的差距。
注意:你应该意识到,晚点发射的元素应该被视为之前计算的更新结果,也就是说,你的数据流将包含同一计算的多个结果。根据你的应用,你需要考虑到这些重复的结果,或者对它们进行重复复制。
处理窗口结果 #
窗口化操作的结果又是一个 DataStream,在结果元素中没有保留任何关于窗口化操作的信息,所以如果你想保留窗口的元信息,你必须在你的 ProcessWindowFunction
的结果元素中手动编码这些信息。在结果元素上设置的唯一相关信息是元素的时间戳。这被设置为处理过的窗口的最大允许时间戳,也就是结束时间戳-1,因为窗口结束时间戳是独占的。注意,这对事件时间窗口和处理时间窗口都是如此,即在窗口化操作后元素总是有一个时间戳,但这个时间戳可以是事件时间时间戳,也可以是处理时间时间戳。对于处理时间窗口来说,这没有特别的影响,但是对于事件时间窗口来说,加上水印与窗口的交互方式,使得连续的窗口化操作具有相同的窗口大小。我们将在看完水印如何与窗口交互后再谈这个问题。
水印和窗口的交互 #
在继续本节之前,你可能想看看我们关于事件时间和水印的章节。
当水印到达窗口操作符时,会触发两件事。
- 水印会触发计算所有窗口的最大时间戳(就是结束时间戳-1)小于新水印的窗口。
- 水印被转发到下游的操作中
直观地说,水印会"冲掉"任何在下游操作中被认为是晚期的窗口,一旦它们收到该水印。
连续的窗口操作 #
如前所述,计算窗口化结果的时间戳的方式以及水印与窗口的交互方式允许将连续的窗口化操作串在一起。当你想进行两个连续的窗口化操作时,如果你想使用不同的键,但仍然希望来自同一个上游窗口的元素最终出现在同一个下游窗口中,这就很有用。考虑这个例子。
val input: DataStream[Int] = ...
val resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer())
val globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction())
在这个例子中,第一次操作的时间窗口 [0,5)
的结果也会在随后的窗口操作中最终出现在时间窗口 [0,5)
。这样就可以计算每个键的和,然后在第二个操作中计算同一窗口内的 top-k 元素。
有用的状态大小考虑 #
窗口可以在很长一段时间内(如几天、几周或几个月)被定义,因此会积累非常大的状态。在估算窗口计算的存储需求时,有几个规则需要牢记。
-
Flink 为每个元素所属的窗口创建一个副本。鉴于此,翻滚窗口为每个元素保留一个副本(一个元素正好属于一个窗口,除非它被后期丢弃)。相比之下,滑动窗口会给每个元素创建若干个,这一点在窗口分配器部分有解释。因此,大小为1天,滑动1秒的滑动窗口可能不是一个好主意。
-
ReduceFunction、AggregateFunction 和 FoldFunction 可以显著降低存储要求,因为它们热衷于聚合元素,每个窗口只存储一个值。相比之下,仅仅使用 ProcessWindowFunction 就需要累积所有元素。
-
使用 Evictor 可以防止任何预聚集,因为一个窗口的所有元素都必须在应用计算之前通过 evictor(见 Evictor)。
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html