数据管道和 ETL
— 焉知非鱼Data Pipelines & ETL
对于 Apache Flink 来说,一个非常常见的用例是实现 ETL(提取、转换、加载)管道,从一个或多个源中获取数据,进行一些转换和/或丰富,然后将结果存储在某个地方。在这一节中,我们将看看如何使用 Flink 的 DataStream API 来实现这种应用。
请注意,Flink的 Table 和 SQL API很适合许多 ETL 用例。但无论你最终是否直接使用 DataStream API,对这里介绍的基础知识有一个扎实的理解都是有价值的。
无状态转换 #
本节介绍了 map() 和 flatmap(),它们是用来实现无状态转换的基本操作。本节中的例子假设你熟悉 flink-training 仓库中的实战练习中使用的出租车乘车数据。
map() #
在第一个练习中,你过滤了一个打车事件的流,在同一个代码库中,有一个 GeoUtils 类,它提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),该方法将一个 location (longitude, latitude) 映射到一个网格单元,该单元指的是一个大约100x100米大小的区域。
现在让我们通过为每个事件添加 startCell 和 endCell 字段来丰富我们的打车对象流。你可以创建一个 EnrichedRide 对象,扩展 TaxiRide,添加这些字段。
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;
public EnrichedRide() {}
public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}
public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}
然后,您可以创建一个应用程序,将流转化为:
val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(...));
val enrichedNYCRides: DataStream[EnrichedRide] = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());
enrichedNYCRides.print();
使用这个 MapFunction:
class Enrichment extends MapFunction[TaxiRide, EnrichedRide] {
override def map(taxiRide: TaxiRide) {
return new EnrichedRide(taxiRide);
}
}
flatmap() #
MapFunction
只适用于执行一对一的转换:对于每一个进入的流元素,map()
将发出一个转换后的元素。否则,你将需要使用 flatmap()
:
val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(...));
val enrichedNYCRides: DataStream[EnrichedRide] = rides
.flatMap(new NYCEnrichment());
enrichedNYCRides.print();
加上一个 FlatMapFunction
:
class NYCEnrichment extends FlatMapFunction[TaxiRide, EnrichedRide] {
override def flatMap(taxiRide: TaxiRide, out: Collector[EnrichedRide]) {
val valid: FilterFunction[TaxiRide] = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}
通过这个接口提供的 Collector,flatmap()
方法可以随心所欲地发射许多流元素,包括完全不发射元素。
Keyed Streams #
keyBy() #
通常,能够围绕一个属性对一个流进行分区是非常有用的,这样所有具有相同属性值的事件就会被归为一组。例如,假设你想找到从每个网格单元开始的最长的出租车乘车时间。从 SQL 查询的角度考虑,这意味着要对 startCell 进行某种 GROUP BY,而在 Flink 中,这是用 keyBy(KeySelector)
来完成的。
rides
.flatMap(new NYCEnrichment())
.keyBy("startCell")
每一个 keyBy
都会引起一次网络洗牌,对流进行重新分区。一般来说,这是很昂贵的,因为它涉及到网络通信以及序列化和反序列化。
在上面的例子中,键是由一个字段名 “startCell” 指定的。这种键选择的风格有一个缺点,那就是编译器无法推断用于键选择的字段的类型,因此 Flink 会将键值作为元组传递,这可能会很笨拙。最好是使用一个正确类型的 KeySelector
,例如:
rides
.flatMap(new NYCEnrichment())
.keyBy(
new KeySelector<EnrichedRide, int>() {
@Override
public int getKey(EnrichedRide enrichedRide) throws Exception {
return enrichedRide.startCell;
}
})
可以用 lambda 更简洁地表达出来。
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)
Keys are computed #
KeySelectors 并不局限于从你的事件中提取一个键,相反,它们可以用任何你想要的方式来计算键,只要产生的键是确定性的,并且有有效的 hashCode()
和 equals()
的实现。这个限制排除了生成随机数,或者返回数组或枚举的 KeySelectors,但是你可以使用元组或 POJOs 来生成复合键,例如,只要它们的元素遵循这些相同的规则。
键必须以确定性的方式产生,因为每当需要它们时,它们就会被重新计算,而不是附加到流记录上。
例如,我们不是创建一个新的 EnrichedRide
类,该类有一个 startCell
字段,然后我们将其用作键:
keyBy(enrichedRide -> enrichedRide.startCell)
相反, 我们可以这样做:
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
Keyed 流的聚合 #
这段代码为每个 end-of-ride 事件创建一个新的元组流,其中包含 startCell
和持续时间(分钟)。
import org.joda.time.Interval;
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});
现在可以产生一个流,其中只包含那些对每个 startCell
来说是有史以来(至此)最长的乘车记录。
有多种方式可以表达作为键的字段。之前你看到了一个 EnrichedRide POJO 的例子,在这个例子中,要用作键的字段是用它的名字指定的。这个例子涉及到 Tuple2 对象,元组中的索引(从0开始)被用来指定键。
minutesByStartCell
.keyBy(0) // startCell
.maxBy(1) // duration
.print();
现在,每当持续时间达到一个新的最大值时,输出流就会包含一个针对每个键的记录–如这里的50797单元格所示。
...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)
(Implicit) State #
这是本次训练中第一个涉及有状态流的例子。虽然状态被透明地处理,但 Flink 必须跟踪每个不同键的最大持续时间。
每当状态涉及到你的应用时,你应该考虑状态可能会变得多大。每当键空间是无限制的,那么 Flink 需要的状态量也是无限制的。
当处理流时,一般来说,在有限的窗口上考虑聚合比在整个流上考虑更有意义。
reduce() 和其他聚合器 #
上文中使用的 maxBy()
只是 Flink 的 KeyedStreams 上众多聚合函数中的一个例子。还有一个更通用的 reduce()
函数,你可以用它来实现自己的自定义聚合。
状态转换 #
为什么 Flink 要参与管理状态? #
你的应用程序当然能够在没有让 Flink 参与管理状态的情况下使用状态–但 Flink 为它所管理的状态提供了一些引人注目的功能。
- 本地化。Flink 状态被保存在处理它的机器的本地,并且可以以内存速度被访问。
- 耐用。Flink 状态是容错的,即每隔一段时间就会自动检查一次,一旦失败就会恢复。
- 纵向可扩展。Flink 状态可以保存在嵌入式 RocksDB 实例中,通过增加更多的本地磁盘来扩展。
- 横向可扩展。随着集群的增长和收缩,Flink 状态会被重新分配。
- 可查询。Flink 状态可以通过可查询状态 API 进行外部查询。
在本节中,您将学习如何使用 Flink 的 API 管理 keyed 状态。
Rich 函数 #
此时你已经看到了 Flink 的几个函数接口,包括 FilterFunction
、MapFunction
和 FlatMapFunction
。这些都是单一抽象方法模式的例子。
对于每一个接口,Flink 还提供了一个所谓的"富"变体,例如,RichFlatMapFunction
,它有一些额外的方法,包括:
- open(Configuration c)
- close()
- getRuntimeContext()
open()
在操作符初始化期间被调用一次。这是一个加载一些静态数据的机会,或者, 例如打开一个外部服务的连接。
getRuntimeContext()
提供了对一整套潜在的有趣的东西的访问,但最值得注意的是它是如何创建和访问由 Flink 管理的状态。
一个带有 Keyed State 的例子 #
在这个例子中,想象一下,你有一个事件流,你想去掉重复,所以你只保留每个键的第一个事件。这里有一个应用程序可以做到这一点,使用一个名为 Deduplicator
的 RichFlatMapFunction
:
private static class Event {
public final String key;
public final long timestamp;
...
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();
env.execute();
}
为了达到这个目的,Deduplicator 将需要以某种方式记住,对于每个键来说,是否已经有了该键的事件。它将使用 Flink 的 keyed state 接口来做到这一点。
当你在使用像这样的 keyed 流时,Flink 将为每个被管理的状态项目维护一个键/值存储。
Flink 支持几种不同类型的 keyed state,本例使用的是最简单的一种,即 ValueState
。这意味着对于每个键,Flink 将存储一个单一的对象–在本例中,一个类型为 Boolean 的对象。
我们的 Deduplicator 类有两个方法:open()
和 flatMap()
。open
方法通过定义一个 ValueStateDescriptor` 来建立对托管状态的使用。构造函数的参数为这个 keyed state 项指定了一个名称(“keyHasBeenSeen”),并提供了可用于序列化这些对象的信息(在本例中,Types.BOOLEAN)。
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
当 flatMap
方法调用 keyHasBeenSeen.value()
时,Flink 的运行时会在上下文中查找 key 的这块状态值,只有当它为 null 时,它才会去收集事件到输出。在这种情况下,它还会将 keyHasBeenSeen
更新为 true。
这种访问和更新 key-partitioned 状态的机制可能看起来相当神奇,因为在我们的 Deduplicator 的实现中,key 并不是显式可见的。当 Flink 的运行时调用我们的 RichFlatMapFunction
的 open
方法时,没有任何事件,因此那一刻上下文中没有 key。但是当它调用 flatMap
方法时,被处理的事件的 key 对运行时来说是可用的,并在幕后用于确定 Flink 的状态后端中的哪个条目被操作。
当部署到分布式集群时,会有很多这个 Deduplicator 的实例,每个实例将负责整个键空间的一个不相干子集。因此,当你看到一个 ValueState 的单项,如:
ValueState<Boolean> keyHasBeenSeen;
理解这不仅仅是一个单一的布尔值,而是一个分布式的、分片式的、键/值存储。
清除状态 #
上面的例子有一个潜在的问题。如果键的空间是无限制的,会发生什么?Flink 是在某个地方为每一个被使用的不同键存储一个布尔的实例。如果有一个有界的键集,那么这将是很好的,但是在键集以无界的方式增长的应用中,有必要为不再需要的键清除状态。这是通过调用状态对象上的 clear()
来实现的,如:
keyHasBeenSeen.clear()
例如,你可能想在给定键的一段时间不活动后这样做。当你在事件驱动的应用程序一节中学习 ProcessFunction
时,你将看到如何使用 Timer
来实现这一点。
此外,还有一个状态存活时间(TTL)选项,你可以用状态描述符来配置,指定什么时候自动清除陈旧键的状态。
Non-keyed State #
也可以在 non-keyed 的上下文中使用托管状态。这有时被称为 operator state。所涉及的接口有些不同,由于用户定义的函数需要 non-keyed state 是不常见的,所以这里不做介绍。这个功能最常用于源和接收器(sink)的实现。
Connected Streams #
有时不是应用这样的预定义变换:
你希望能够动态地改变变换的某些方面–通过流的阈值,或规则,或其他参数。Flink 中支持这种模式的是一种叫做连接流(connected streams)的东西,其中一个 operator 有两个输入流,就像这样:
连接流也可以用来实现流式连接(streaming joins.)。
例子 #
在这个例子中,控制流被用来指定必须从 streamOfWords 中过滤掉的单词。一个名为 ControlFunction 的 RichCoFlatMapFunction 被应用到连接的流中来完成这个任务。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
control
.connect(datastreamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
注意,被连接的两个流必须以兼容的方式进行 keyed。keyBy 的作用是对流的数据进行分区,当 keyed 流连接时,必须以同样的方式进行分区。这样就可以保证两个流中具有相同 key 的事件都会被发送到同一个实例中。那么,这就使得将该键上的两个流连接起来成为可能,例如。
在这种情况下,两个流的类型都是 DataStream[String]
,并且两个流都以字符串为键。如下所示,这个 RichCoFlatMapFunction
在 keyed state 下存储了一个布尔值,而这个布尔值是由两个流共享的。
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
RichCoFlatMapFunction 是 FlatMapFunction 的一种,它可以应用于一对连接的流,并且它可以访问富函数接口。这意味着它可以被做成有状态的。
屏蔽的(blocked)布尔正在被用来记住控制流上提到的键(在这里是单词),这些词被过滤出 streamOfWords 流。这就是 keyed state,它在两个流之间是共享的,这就是为什么两个流要共享同一个键空间。
flatMap1
和 flatMap2
被 Flink 运行时调用,分别来自两个连接流的元素–在我们的例子中,来自控制流的元素被传入 flatMap1
,来自 streamOfWords
的元素被传入 flatMap2
。这是由使用 control.connect(datastreamOfWords)
连接两个流的顺序决定的。
重要的是要认识到,你无法控制调用 flatMap1
和 flatMap2
回调的顺序。这两个输入流在相互竞争,Flink 运行时将对来自一个流或另一个流的事件的消耗做它想做的事。在时间和/或顺序很重要的情况下,你可能会发现有必要在托管的 Flink 状态下缓冲事件,直到你的应用程序准备好处理它们。(注意:如果你真的很绝望,可以通过使用实现 InputSelectable 接口的自定义 Operator 来对双输入 operator 消耗输入的顺序进行一些有限的控制。)
实践 #
与本节配套的实践练习是“乘车与票价练习”。