流分析
— 焉知非鱼Streaming Analytics
Event Time 和 Watermarks #
介绍 #
Flink 明确支持三种不同的时间概念。
事件时间:事件发生的时间,由产生(或存储)该事件的设备记录的时间
摄取时间:Flink 在摄取事件时记录的时间戳。
处理时间:您的管道中的特定 operator 处理事件的时间。
为了获得可重复的结果,例如,在计算某一天股票在交易的第一个小时内达到的最高价格时,您应该使用事件时间(event time)。这样一来,结果就不会依赖于计算的时间。这种实时应用有时会使用处理时间(processing time),但这样一来,结果就会由该小时内恰好处理的事件决定,而不是由当时发生的事件决定。基于处理时间的计算分析会导致不一致,并使重新分析历史数据或测试新的实现变得困难。
使用事件时间 #
默认情况下,Flink 将使用处理时间(processing time)。要改变这一点,您可以设置时间特性(Time Characteristic)。
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
如果你想使用事件时间,你还需要提供一个时间戳提取器和水印生成器,Flink 将使用它们来跟踪事件时间的进展。这将在下面的“使用水印”一节中介绍,但首先我们应该解释一下什么是水印。
水印 #
让我们通过一个简单的例子来说明为什么需要水印,以及它们是如何工作的。
在这个例子中,你有一个带时间戳的事件流,这些事件的到达顺序有些混乱,如下所示。显示的数字是时间戳,表示这些事件实际发生的时间。第一个到达的事件发生在时间 4,随后是更早发生的事件,在时间 2,以此类推。
··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →
现在想象一下,你正在尝试创建一个流排序器(stream sorter)。这个应用程序的目的是处理流中的每个事件,并发出一个新的流,其中包含相同的事件,但按时间戳排序。
一些观察:
(1)你的流排序器看到的第一个元素是 4, 但你不能马上把它作为排序流的第一个元素释放出来。它可能已经不按顺序到达,而更早的事件可能还没有到达。事实上,你对这个流的未来有一些神一样的知识,你可以看到,你的流排序器至少应该等到 2 到达后再产生任何结果。
一些缓冲,和一些延迟,是必要的。
(2)如果你做错了,你可能最终会永远等待。首先,排序器看到了一个来自时间 4 的事件,然后是一个来自时间 2 的事件。一个时间戳小于 2 的事件会不会永远到达?也许会,也许不会。也许不会。你可以永远等待,永远看不到 1。
最终你必须鼓起勇气,发出 2 作为排序流的开始。
(3)那么你需要的是某种策略,它定义了对于任何给定的时间戳事件,何时停止等待早期事件的到来。
这正是水印的作用–它们定义了何时停止等待早期(earlier)事件。
Flink 中的事件时间处理依赖于水印生成器,这些水印生成器将特殊的时间戳元素插入到流中,称为水印。时间 t 的水印是一种断言,即到时间 t 为止,流现在(可能)是完整的。
这个流排序器应该在什么时候停止等待,并推出2开始排序流?当一个时间戳为 2,或更大的水印到达时。
(4)你可以想象不同的策略来决定如何生成水印。
每一个事件都是在一些延迟之后到达的,而这些延迟是不同的,所以一些事件的延迟比其他事件更多。一个简单的方法是假设这些延迟被某个最大延迟所约束。Flink 将这种策略称为有界无序水印。很容易想象更复杂的水印方法,但对于大多数应用来说,固定的延迟已经足够好了。
延迟与完整性 #
关于水印的另一种思考方式是,水印让你这个流式应用的开发者能够控制延迟和完整性之间的权衡。与批处理不同的是,在批处理中,人们可以在产生任何结果之前完全了解输入,而在流式处理中,你最终必须停止等待看到更多的输入,并产生某种结果。
你可以积极地配置你的水印,用一个很短的延迟,从而承担在对输入不完全了解的情况下产生结果的风险–也就是说,一个可能是错误的结果,很快就产生了。或者你可以等待更长时间,并利用对输入流更完整的知识产生结果。
也可以实现混合解决方案,快速生成初始结果,然后在处理额外(后期)数据时对这些结果进行更新。对于某些应用来说,这是一种很好的方法。
延迟 #
迟到的定义是相对于水印而言的。水印(t)声明流在时间t之前是完整的;在这个水印之后的任何事件,如果时间戳 ≤t,则为延迟。
使用水印 #
为了执行基于事件时间的事件处理,Flink 需要知道与每个事件相关联的时间,还需要流包含水印。
实践练习中使用的 Taxi 数据源为你处理了这些细节。但在你自己的应用程序中,你必须自己处理这些事情,通常是通过实现一个类来实现,该类从事件中提取时间戳,并按需生成水印。最简单的方法是使用 WatermarkStrategy:
DataStream<Event> stream = ...
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);
DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);
窗口 #
Flink 具有非常有表现力的窗口语义。
在本节中,你将学习
- 如何使用窗口来计算无边界流的聚合。
- Flink 支持哪些类型的窗口,以及
- 如何实现一个窗口化聚合的 DataStream 程序?
介绍 #
在做流处理的时候,自然而然地想要计算流的有界子集的聚合分析,以回答这样的问题。
- 每分钟的页面浏览量
- 每个用户每周会话数
- 每个传感器每分钟的最高温度
用 Flink 计算窗口化分析依赖于两个主要的抽象。窗口分配器(Window Assigners)将事件分配给窗口(必要时创建新的窗口对象),窗口函数(Window Functions)应用于分配给窗口的事件。
Flink 的窗口 API 还有 Triggers 的概念,它决定什么时候调用窗口函数,还有 Evictors,它可以删除窗口中收集的元素。
在它的基本形式中,你将窗口化应用到像这样的 keyed stream 中。
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)
您也可以对 non-keyed stream 使用窗口化,但请记住,在这种情况下,处理将不会并行进行。
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)
窗口分配器 #
Flink 有几种内置的窗口分配器类型,下面进行说明。
一些例子说明这些窗口分配器的用途,以及如何指定它们:
-
滚动时间窗口
-
每分钟浏览量
-
TumblingEventTimeWindows.of(Time.minutes(1))
-
滑动时间窗口
-
每10秒计算的每分钟页面浏览量
-
SlidingEventTimeWindows.of(Time.min(1), Time.seconds(10))
-
会话窗口
-
每节课的页面浏览量,其中每节课之间至少有30分钟的间隔。
-
EventTimeSessionWindows.withGap(Time.minutes(30))
可以使用 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n) 中的一种指定持续时间。
基于时间的窗口分配器(包括会话窗口)有事件时间(event time)和处理时间(processing time)两种风味。这两种类型的时间窗口之间有显著的权衡。对于处理时间窗口,你必须接受这些限制:
- 不能正确处理历史数据。
- 不能正确处理失序数据。
- 结果将是非确定性的。
但具有较低延迟的优势。
当使用基于计数的窗口时,请记住,这些窗口将不会启动,直到一个批次完成。没有超时和处理部分窗口的选项,尽管你可以用自定义的触发器自己实现这种行为。
全局窗口分配器将每个事件(用相同的键)分配到同一个全局窗口。只有当你打算使用自定义触发器来做你自己的自定义窗口时,这才是有用的。在许多看似有用的情况下,您最好使用另一节中描述的 ProcessFunction。
窗口函数 #
对于如何处理窗口的内容,您有三个基本选项。
- 作为一个批次,使用一个 ProcessWindowFunction,它将被传递一个包含窗口内容的 Iterable。
- 以增量方式,使用 ReduceFunction 或 AggregateFunction,当每个事件被分配到窗口时被调用。
- 或两者结合,当窗口被触发时,ReduceFunction 或 AggregateFunction 的预聚集结果被提供给 ProcessWindowFunction。
这里是方法1和3的例子。每个实现都在1分钟的事件时间窗口中从每个传感器中找到峰值值,并产生一个包含(key, end-of-window-timestamp, max_value) 的 Tuples 流。
ProcessWindowFunction 示例 #
DataStream<SensorReading> input = ...
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());
public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // input type
Tuple3<String, Long, Integer>, // output type
String, // key type
TimeWindow> { // window type
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {
int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
在这个实现中,有几件事需要注意。
- 所有分配给窗口的事件都必须在 keyed Flink state 下被缓冲,直到窗口被触发。这可能是相当昂贵的。
- 我们的 ProcessWindowFunction 被传递了一个 Context 对象,其中包含了窗口的信息。它的接口是这样的:
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}
windowState 和 globalState 是您可以存储该键的所有窗口的 per-key, per-window, 或全局 per-key 信息的地方。例如,如果您想记录一些关于当前窗口的信息,并在处理后续窗口时使用这些信息,这可能会很有用。
递增聚合示例 #
DataStream<SensorReading> input = ...
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());
private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}
private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {
SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
请注意,Iterable<SensorReading>
将只包含一个读数–由 MyReducingMax 计算的 pre-aggregated 最大值。
迟来的事件 #
默认情况下,当使用事件时间窗口时,迟到的事件会被丢弃。窗口 API 有两个可选部分可以让您对此有更多的控制。
您可以使用名为“侧输出”的机制,安排将被丢弃的事件收集到一个备用的输出流中。下面是一个例子,说明这可能是什么样子的:
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);
DataStream<Event> lateStream = result.getSideOutput(lateTag);
您还可以指定允许的延迟时间间隔,在此期间,延迟事件将继续分配给相应的窗口(其状态将被保留)。默认情况下,每个延迟事件都会导致窗口函数再次被调用(有时称为延迟发射)。
换句话说,水印后面的元素会被丢弃(或发送到侧输出)。
比如说:
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
当允许的延迟大于零时,只有那些晚到会被丢弃的事件才会被发送到侧输出(如果已经配置了)。
惊喜 #
Flink 的 windowing API 的某些方面可能并不像你所期望的那样。基于 flink 用户邮件列表和其他地方的常见问题,这里有一些关于窗口的事实可能会让你感到惊讶。
滑动窗口会进行复制 #
滑动窗口分配器可以创建很多窗口对象,并会将每个事件复制到每个相关窗口中。例如,如果你每15分钟有一个长度为24小时的滑动窗口,每个事件将被复制到 4*24=96 个窗口中。
时间窗口与纪元对齐 #
仅仅因为你使用了一个小时的处理时间窗口,并且在 12:05 开始运行你的应用程序,并不意味着第一个窗口会在 1:05 关闭。第一个窗口将长达 55 分钟,并在 1:00 关闭。
但是请注意,滚动窗口和滑动窗口分配器采用一个可选的偏移参数,可以用来改变窗口的对齐方式。详情请参见滚动窗口和滑动窗口。
窗口可以跟随窗口 #
例如,这样做是可行的:
stream
.keyBy(t -> t.key)
.timeWindow(<time specification>)
.reduce(<reduce function>)
.timeWindowAll(<same time specification>)
.reduce(<same reduce function>)
你可能会期望 Flink 的运行时足够聪明,能够为你做这种并行的预聚合(前提是你使用的是 ReduceFunction 或 AggregateFunction),但事实并非如此。
之所以这样做的原因是,一个时间窗口产生的事件会根据窗口结束的时间分配时间戳。所以,例如,一个小时长的窗口产生的所有事件都会有标记一个小时结束的时间戳。任何消耗这些事件的后续窗口的持续时间应该与前一个窗口的持续时间相同,或者是其倍数。
空的时间窗口没有结果 #
只有当事件被分配到窗口时,才会创建窗口。因此,如果在给定的时间帧内没有事件,就不会报告结果。
迟来的事件会导致迟来的合并 #
会话窗口是基于可以合并的窗口的抽象。每个元素最初都被分配到一个新的窗口,之后只要窗口之间的间隙足够小,就会合并。这样一来,一个迟到的事件可以弥合分开两个之前独立的会话的差距,产生迟到的合并。
实践 #
与本节配套的实战练习是 Hourly Tips Exercise。
进一步阅读 #
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/streaming_analytics.html