Dataset 变换
— 焉知非鱼Dataset Transformations
DataSet 转换
本文档深入介绍了 DataSets 上可用的转换。关于 Flink Java API 的一般介绍,请参考编程指南。
对于密集索引的数据集中的压缩元素,请参考压缩元素指南。
Map #
Map 转换将用户定义的映射函数应用于 DataSet 的每个元素。它实现了一对一的映射,也就是说,函数必须准确地返回一个元素。
下面的代码将一个由整数对组成的 DataSet 转化为一个由整数组成的 DataSet。
val intPairs: DataSet[(Int, Int)] = // [...]
val intSums = intPairs.map { pair => pair._1 + pair._2 }
FlatMap #
FlatMap 转换在 DataSet 的每个元素上应用了一个用户定义的 flat-map
函数。这种映射函数的变体可以为每个输入元素返回任意多个结果元素(包括没有)。
下面的代码将一个文本行的 DataSet 转换为一个单词的 DataSet。
val textLines: DataSet[String] = // [...]
val words = textLines.flatMap { _.split(" ") }
MapPartition #
MapPartition 在一次函数调用中转换一个并行分区。map-partition 函数以 Iterable 的形式获取分区,并可以产生任意数量的结果值。每个分区中元素的数量取决于平行度和之前的操作。
下面的代码将文本行的 DataSet 转换为每个分区的计数 DataSet。
val textLines: DataSet[String] = // [...]
// Some is required because the return value must be a Collection.
// There is an implicit conversion from Option to a Collection.
val counts = texLines.mapPartition { in => Some(in.size) }
Filter #
过滤器转换将用户定义的过滤器函数应用于 DataSet 的每个元素,并且只保留那些函数返回为真的元素。
以下代码从数据集中删除所有小于零的整数。
val intNumbers: DataSet[Int] = // [...]
val naturalNumbers = intNumbers.filter { _ > 0 }
重要:系统假设函数不会修改应用谓词的元素。违反这个假设会导致错误的结果。
元组数据集的投影(Projection) #
Project
转换删除或移动 Tuple DataSet 的 Tuple 字段。project(int...)
方法通过其索引选择应该保留的 Tuple 字段,并定义它们在输出 Tuple 中的顺序。
投影(Projection)不需要定义用户函数。
下面的代码显示了在 DataSet 上应用 Project
转换的不同方法。
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
# scala
Not supported.
分组数据集上的变换 #
reduce
操作可以对分组的数据集进行操作。指定用于分组的键可以通过多种方式进行。
- 键表达式
- 键选择器函数
- 一个或多个字段位置键(仅限元组数据集)。
- case 类字段(仅 case 类)
请看一下 reduce
的例子,看看如何指定分组键。
换算分组数据集 #
应用于分组数据集的 Reduce
转换,使用用户定义的 Reduce
函数将每个分组换算为一个元素。对于每一组输入元素,一个 Reduce 函数将成对的元素连续组合成一个元素,直到每组只剩下一个元素。
请注意,对于一个 ReduceFunction
,返回对象的键字段应该与输入值相匹配。这是因为 reduce
是隐式可组合的,当传递给 reduce
运算符时,从 combine
运算符发出的对象又是按键分组的。
在按键表达式分组的数据集上进行 Reduce 操作 #
键表达式指定了 DataSet 中每个元素的一个或多个字段。每个键表达式都是一个公共字段的名称或一个 getter 方法。点号可以用来深入到对象中。键表达式 "*"
可以选择所有字段。下面的代码展示了如何使用键表达式对 POJO 数据集进行分组,并使用 reduce
函数对其进行换算。
// some ordinary POJO
class WC(val word: String, val count: Int) {
def this() {
this(null, -1)
}
// [...]
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
对按键选择器分组的数据集进行换算 #
键选择器函数从数据集的每个元素中提取一个键值。提取的键值用于对 DataSet 进行分组。下面的代码展示了如何使用键选择器函数对 POJO 数据集进行分组,并使用 reduce 函数对其进行换算。
// some ordinary POJO
class WC(val word: String, val count: Int) {
def this() {
this(null, -1)
}
// [...]
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy { _.word } reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
对按字段位置键分组的数据集进行换算(仅元组数据集) #
字段位置键指定了一个 Tuple DataSet 的一个或多个字段,这些字段被用作分组键。下面的代码显示了如何使用字段位置键和应用 reduce 函数。
val tuples = DataSet[(String, Int, Double)] = // [...]
// group on the first and second Tuple field
val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
对按 case 类字段分组的数据集进行换算 #
当使用 Case Classes 时,你也可以使用字段的名称来指定分组键。
case class MyClass(val a: String, b: Int, c: Double)
val tuples = DataSet[MyClass] = // [...]
// group on the first and second field
val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
在分组数据集上进行分组换算 #
应用在分组 DataSet 上的 GroupReduce 转换,会对每个组调用用户定义的 group-reduce
函数。这与 Reduce 之间的区别在于,用户定义的函数可以一次性获得整个组。该函数是在一个组的所有元素上用一个 Iterable
调用的,并且可以返回任意数量的结果元素。
在按字段位置键分组的数据集上进行分组 Reduce(只适用于元组数据集) #
下面的代码显示了如何从一个按 Integer 分组的 DataSet 中删除重复的字符串。
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).reduceGroup {
(in, out: Collector[(Int, String)]) =>
in.toSet foreach (out.collect)
}
对按键表达式、键选择器函数或 case 类字段分组的数据集进行分组换算 #
类似于 Reduce 变换中的键表达式、键选择器函数和 case 类字段的工作。
对排序组进行 GroupReduce #
一个 group-reduce
函数使用一个 Iterable 访问一个组的元素。可选地,Iterable 可以按照指定的顺序输出一个组的元素。在许多情况下,这有助于降低用户定义的 group-reduce
函数的复杂性,并提高其效率。
下面的代码显示了另一个例子,如何在一个由整数分组并按 String 排序的 DataSet 中删除重复的 String。
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
(in, out: Collector[(Int, String)]) =>
var prev: (Int, String) = null
for (t <- in) {
if (prev == null || prev != t)
out.collect(t)
prev = t
}
}
注意:如果在 reduce
操作之前,使用运算符的基于排序的执行策略建立了分组,那么 GroupSort 通常是免费的。
可组合的 GroupReduceFunctions #
与 reduce 函数不同,group-reduce
函数是不可隐式组合的。为了使一个分组换算函数可以组合,它必须实现 GroupCombineFunction
接口。
重要:GroupCombineFunction
接口的通用输入和输出类型必须等于 GroupReduceFunction
的通用输入类型,如下例所示。
// Combinable GroupReduceFunction that computes two sums.
class MyCombinableGroupReducer
extends GroupReduceFunction[(String, Int), String]
with GroupCombineFunction[(String, Int), (String, Int)]
{
override def reduce(
in: java.lang.Iterable[(String, Int)],
out: Collector[String]): Unit =
{
val r: (String, Int) =
in.iterator.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
// concat key and sum and emit
out.collect (r._1 + "-" + r._2)
}
override def combine(
in: java.lang.Iterable[(String, Int)],
out: Collector[(String, Int)]): Unit =
{
val r: (String, Int) =
in.iterator.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
// emit tuple with key and sum
out.collect(r)
}
}
在分组数据集上进行分组合并 #
GroupCombine
变换是可组合的 GroupReduceFunction
中 combine
步骤的泛化形式。与此相反,GroupReduce
函数中的 combine
步骤只允许从输入类型 I 到输出类型 I 的组合。这是因为 GroupReduce
函数中的 reduce
步骤期望输入类型 I。
在某些应用中,希望在执行额外的转换(例如减少数据大小)之前,将一个数据集合并成中间格式。这可以通过一个 CombineGroup
转换来实现,而且成本很低。
注意:对分组数据集的 GroupCombine
是在内存中以贪婪的策略执行的,它可能不会一次处理所有数据,而是分多个步骤进行。它也是在各个分区上执行的,而不像 GroupReduce
变换那样进行数据交换。这可能会导致部分结果。
下面的例子演示了如何使用 CombineGroup
变换来实现另一种 WordCount
。
val input: DataSet[String] = [..] // The words received as input
val combinedWords: DataSet[(String, Int)] = input
.groupBy(0)
.combineGroup {
(words, out: Collector[(String, Int)]) =>
var key: String = null
var count = 0
for (word <- words) {
key = word
count += 1
}
out.collect((key, count))
}
val output: DataSet[(String, Int)] = combinedWords
.groupBy(0)
.reduceGroup {
(words, out: Collector[(String, Int)]) =>
var key: String = null
var sum = 0
for ((word, sum) <- words) {
key = word
sum += count
}
out.collect((key, sum))
}
上面的另一种 WordCount
实现演示了 GroupCombine
如何在执行 GroupReduce
转换之前组合单词。上面的例子只是一个概念证明。请注意,组合步骤如何改变 DataSet 的类型,通常在执行 GroupReduce
之前需要进行额外的 Map 转换。
在分组元组数据集上进行聚合 #
有一些常用的聚合操作是经常使用的。Aggregate 转换提供了以下内置的聚合函数。
- Sum,
- Min,
- Max.
Aggregate 变换只能应用在 Tuple 数据集上,并且只支持字段位置键进行分组。
下面的代码显示了如何在按字段位置键分组的数据集上应用"聚合"变换。
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
要在一个 DataSet 上应用多个聚合,必须在第一个聚合之后使用 .and()
函数,也就是说 .aggregary(SUM, 0).and(MIN, 2)
会产生原始 DataSet 的字段 0 和字段 2 的最小值之和。与此相反,.aggregary(SUM,0).aggregary(MIN,2)
将在一个聚合上应用一个聚合。在给定的示例中,它将在计算字段 0 与字段 1 分组后产生字段 2 的最小值。
注意:聚合函数集将在未来得到扩展。
对分组元组数据集的 MinBy / MaxBy 函数 #
MinBy (MaxBy)
转换为每组元组选择一个元组。被选择的元组是一个或多个指定字段的值是最小(最大)的元组。用于比较的字段必须是有效的关键字段,即可比较的字段。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。
下面的代码显示了如何从 DataSet<Tuple3<Integer, String, Double>>
中选择具有相同 String 值的每组元组的 Integer 和 Double 字段最小值的元组。
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input
.groupBy(1) // group DataSet on second field
.minBy(0, 2) // select tuple with minimum values for first and third field.
换算整个数据集 #
Reduce 转换将用户定义的 reduce
函数应用于一个数据集的所有元素。随后,reduce
函数将元素对组合成一个元素,直到只剩下一个元素。
下面的代码显示了如何对一个整数数据集的所有元素进行求和。
val intNumbers = env.fromElements(1,2,3)
val sum = intNumbers.reduce (_ + _)
使用 Reduce 转换换算一个完整的 DataSet 意味着最后的 Reduce 操作不能并行完成。然而,reduce
函数是可以自动组合的,因此 Reduce 转换不会限制大多数用例的可扩展性。
对整个数据集进行分组换算 #
GroupReduce
转换将用户定义的 group-reduce
函数应用于 DataSet 的所有元素。group-reduce
可以遍历 DataSet 的所有元素,并返回任意数量的结果元素。
下面的示例展示了如何在一个完整的 DataSet 上应用 GroupReduce
转换。
val input: DataSet[Int] = // [...]
val output = input.reduceGroup(new MyGroupReducer())
注意:如果 group-reduce
函数不可组合,那么在一个完整的 DataSet 上的 GroupReduce
转换不能并行完成。因此,这可能是一个非常耗费计算的操作。请参阅上面的"可组合的 GroupReduceFunctions" 部分,了解如何实现可组合的 group-reduce
函数。
在完整的数据集上进行分组合并(GroupCombine) #
在一个完整的 DataSet 上的 GroupCombine 的工作原理类似于在一个分组的 DataSet 上的 GroupCombine。在所有节点上对数据进行分区,然后以贪婪的方式进行合并(即只有适合内存的数据才会一次性合并)。
在完整的 Tuple 数据集上进行聚合 #
有一些常用的聚合操作是经常使用的。Aggregate 转换提供了以下内置的聚合函数。
- Sum,
- Min, 和
- Max.
Aggregate 变换只能应用于 Tuple 数据集。
下面的代码显示了如何在一个完整的数据集上应用聚合转换。
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.aggregate(SUM, 0).and(MIN, 2)
注意:扩展支持的聚合函数集是我们的路线图。
在完整的元组数据集上实现 MinBy / MaxBy #
MinBy (MaxBy)
转换从一个元组数据集中选择一个元组。被选择的元组是一个或多个指定字段的值是最小(最大)的元组。用于比较的字段必须是有效的键字段,即可比较的字段。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。
以下代码显示了如何从 DataSet<Tuple3<Integer, String, Double>>
中选择具有 Integer 和 Double 字段最大值的元组。
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input
.maxBy(0, 2) // select tuple with maximum values for first and third field.
Distinct #
Distinct 转换计算源 DataSet 中不同元素的 DataSet。下面的代码从 DataSet 中删除所有重复的元素。
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()
也可以使用以下方法改变 DataSet 中元素的区分方式。
- 一个或多个字段位置键(仅元组数据集)。
- 一个键选择器函数,或
- 一个键表达式
用字段位置键去重(Distinct) #
val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)
用 KeySelector 函数去重(Distinct) #
val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}
用键表达式去重(Distinct) #
// some ordinary POJO
case class CustomType(aName : String, aNumber : Int) { }
val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")
也可以用通配符表示使用所有字段:
// some ordinary POJO
val input: DataSet[CustomType] = // [...]
val output = input.distinct("_")
Join #
Join 转换将两个 DataSets 连接成一个 DataSet。两个数据集的元素在一个或多个键上进行连接(join),这些键可以通过使用
- 键选择器函数
- 一个或多个字段位置键(仅限 Tuple DataSet)。
- case 类字段
有几种不同的方法来执行 Join 转换,如下所示。
默认的 Join (Join into Tuple2) #
默认的 Join 变换会产生一个新的 Tuple DataSet,它有两个字段。每个元组在第一个元组字段中持有第一个输入 DataSet 的 join 元素,在第二个字段中持有第二个输入 DataSet 的匹配元素。
下面的代码显示了一个使用字段位置键的默认 Join 转换。
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Double, Int)] = // [...]
val result = input1.join(input2).where(0).equalTo(1)
用 Join 函数连接 #
Join 转换也可以调用用户定义的 join
函数来处理连接(joining)元组。join
函数接收第一个输入 DataSet 的一个元素和第二个输入 DataSet 的一个元素,并准确返回一个元素。
下面的代码使用键选择器函数执行了一个带有自定义 java 对象的 DataSet 和一个 Tuple DataSet 的连接,并展示了如何使用用户定义的连接(join)函数。
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
(rating, weight) => (rating.name, rating.points * weight._2)
}
用 Flat-Join 函数连接 #
类似于 Map 和 FlatMap,FlatJoin
的行为方式与 Join 相同,但它不是返回一个元素,而是可以返回(收集)、零个、一个或多个元素。
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
(rating, weight, out: Collector[(String, Double)]) =>
if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
}
用 Projection (Java Only) 连接 #
Join 变换可以使用投影(projection)构造结果元组,如下所示:
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(0)
// select and reorder fields of matching tuples
.projectFirst(0,2).projectSecond(1).projectFirst(1);
// scala
Not supported.
用数据集大小提示 Join #
为了引导优化器选择正确的执行策略,你可以提示要连接(join)的 DataSet 的大小,如下所示:
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// hint that the second DataSet is very small
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
// hint that the second DataSet is very large
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
Join 算法提示 #
Flink 运行时可以以各种方式执行连接(join)。每一种可能的方式在不同的情况下都会优于其他方式。系统会尝试自动选择一种合理的方式,但也允许你手动选择一种策略,以防你想强制执行特定的连接(join)方式。
val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
有以下提示:
-
OPTIMIZER_CHOOSES: 相当于完全不给提示,让系统来选择。
-
BROADCAST_HASH_FIRST:广播第一个输入,并据此建立一个哈希表,由第二个输入探测。如果第一个输入的数据非常小,这是一个很好的策略。
-
BROADCAST_HASH_SECOND: 广播第二个输入,并从中建立一个哈希表,由第一个输入探测。如果第二个输入非常小,是一个很好的策略。
-
REPARTITION_HASH_FIRST:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第一个输入建立一个哈希表。如果第一个输入比第二个输入小,但两个输入都很大,这个策略就很好。注意:如果无法估计大小,也无法重新使用已有的分区和排序,系统就会使用这个默认的后备策略。
-
REPARTITION_HASH_SECOND:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第二个输入建立一个哈希表。如果第二个输入比第一个输入小,但两个输入仍然很大,这个策略就很好。
-
REPARTITION_SORT_MERGE:系统对每个输入进行分区(洗牌)(除非输入已经分区),并对每个输入进行排序(除非已经排序)。通过对排序后的输入进行流式合并来连接(join)这些输入。如果一个或两个输入都已经被排序,这个策略就很好。
外连接 #
OuterJoin
转换在两个数据集上执行左、右或全外连接。外连接与常规(内连接)类似,创建所有键值相等的元素对。此外,如果在另一侧没有找到匹配的键,“外侧"的记录(左、右,或者在完全的情况下两者都有)将被保留。匹配的一对元素(或一个元素和另一个输入的空值)被交给 JoinFunction 将这对元素变成一个元素,或交给 FlatJoinFunction 将这对元素变成任意多个(包括无)元素。
两个 DataSets 的元素都是在一个或多个键上连接的,这些键可以通过使用
- 键选择器函数
- 一个或多个字段位置键(仅限 Tuple DataSet)。
- case 类字段
OuterJoins 只支持 Java 和 Scala DataSet API。
用 Join 函数进行外连接 #
OuterJoin
转换调用一个用户定义的 join
函数来处理连接元组。join
函数接收第一个输入 DataSet 的一个元素和第二个输入 DataSet 的一个元素,并准确地返回一个元素。根据外连接的类型(左、右、全),连接函数的两个输入元素中可以有一个是空的。
下面的代码使用键选择器函数执行 DataSet 与自定义 java 对象和 Tuple DataSet 的左外连接,并展示了如何使用用户定义的连接函数。
case class Rating(name: String, category: String, points: Int)
val movies: DataSet[(String, String)] = // [...]
val ratings: DataSet[Ratings] = // [...]
val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
(movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
}
使用 Flat-Join 函数进行外连接 #
类似于 Map 和 FlatMap,一个带有 flat-join
函数的 OuterJoin 的行为与带有 join
函数的 OuterJoin 相同,但它不是返回一个元素,而是可以返回(收集)、零个、一个或多个元素。
Not supported.
Join 算法提示 #
Flink 运行时可以以各种方式执行外连接。每一种可能的方式在不同的情况下都会优于其他方式。系统试图自动选择一种合理的方式,但允许你手动选择一种策略,以防你想强制执行特定的外连接方式。
val input1: DataSet[SomeType] = // [...]
val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
有以下提示:
-
OPTIMIZER_CHOOSES: 相当于完全不给提示,让系统来选择。
-
BROADCAST_HASH_FIRST:广播第一个输入,并据此建立一个哈希表,由第二个输入探测。如果第一个输入的数据非常小,这是一个很好的策略。
-
BROADCAST_HASH_SECOND: 广播第二个输入,并从中建立一个哈希表,由第一个输入探测。如果第二个输入非常小,是一个很好的策略。
-
REPARTITION_HASH_FIRST:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第一个输入建立一个哈希表。如果第一个输入比第二个输入小,但两个输入仍然很大,这个策略就很好。
-
REPARTITION_HASH_SECOND:系统对每个输入进行分区(洗牌)(除非输入已经被分区),并从第二个输入建立一个哈希表。如果第二个输入比第一个输入小,但两个输入仍然很大,这个策略就很好。
-
REPARTITION_SORT_MERGE:系统对每个输入进行分区(洗牌)(除非输入已经分区),并对每个输入进行排序(除非已经排序)。通过对排序后的输入进行流式合并来连接(join)这些输入。如果一个或两个输入都已经被排序,这个策略就很好。
注意:目前还不是所有的外连接类型都支持所有的执行策略。
-
LeftOuterJoin 支持:
- OPTIMIZER_CHOOSES
- BROADCAST_HASH_SECOND
- REPARTITION_HASH_SECOND
- REPARTITION_SORT_MERGE
-
RightOuterJoin 支持:
- OPTIMIZER_CHOOSES
- BROADCAST_HASH_FIRST
- REPARTITION_HASH_FIRST
- REPARTITION_SORT_MERGE
-
FullOuterJoin 支持:
- OPTIMIZER_CHOOSES
- REPARTITION_SORT_MERGE
Cross #
Cross 变换将两个 DataSets 组合成一个 DataSet。它建立了两个输入数据集元素的所有 pairwise 组合,即建立了一个笛卡尔积。Cross 变换要么在每对元素上调用用户定义的 cross
函数,要么输出一个 Tuple2。这两种模式如下所示。
注意:Cross 是一个潜在的计算密集型操作,甚至可以挑战大型计算集群。
使用用户定义函数进行交叉运算 #
Cross 变换可以调用一个用户定义的 cross
函数。cross
函数接收第一个输入的一个元素和第二个输入的一个元素,并正好返回一个结果元素。
下面的代码展示了如何使用 cross
函数对两个 DataSets 进行交叉变换。
case class Coord(id: Int, x: Int, y: Int)
val coords1: DataSet[Coord] = // [...]
val coords2: DataSet[Coord] = // [...]
val distances = coords1.cross(coords2) {
(c1, c2) =>
val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
(c1.id, c2.id, dist)
}
用数据集大小提示交叉 #
为了引导优化器选择正确的执行策略,你可以提示要交叉的 DataSet 的大小,如下所示。
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// hint that the second DataSet is very small
val result1 = input1.crossWithTiny(input2)
// hint that the second DataSet is very large
val result1 = input1.crossWithHuge(input2)
CoGroup #
CoGroup 转换联合(jointly)处理两个 DataSets 的组。两个 DataSets 根据定义的键进行分组,共享同一键的两个 DataSets 的组被一起交给用户定义的共组(co-group)函数。如果对于一个特定的键来说,只有一个 DataSet 有一个组,那么 co-group
函数就会和这个组以及一个空组一起被调用。共组(co-group)函数可以分别迭代两个组的元素,并返回任意数量的结果元素。
与 Reduce、GroupReduce 和 Join 类似,可以使用不同的键选择器方法来定义键。
数据集上的 CoGroup #
val iVals: DataSet[(String, Int)] = // [...]
val dVals: DataSet[(String, Double)] = // [...]
val output = iVals.coGroup(dVals).where(0).equalTo(0) {
(iVals, dVals, out: Collector[Double]) =>
val ints = iVals map { _._2 } toSet
for (dVal <- dVals) {
for (i <- ints) {
out.collect(dVal._2 * i)
}
}
}
Union #
产生两个 DataSets 的联合(union),这两个 DataSets 必须是同一类型。两个以上 DataSets 的联合(union)可以通过多个联合(union)调用来实现,如下所示。
val vals1: DataSet[(String, Int)] = // [...]
val vals2: DataSet[(String, Int)] = // [...]
val vals3: DataSet[(String, Int)] = // [...]
val unioned = vals1.union(vals2).union(vals3)
Rebalance #
均匀地重新平衡 DataSet 的并行分区,以消除数据倾斜。
val in: DataSet[String] = // [...]
// rebalance DataSet and apply a Map transformation.
val out = in.rebalance().map { ... }
Hash-Partition #
在给定的键上对 DataSet 进行散列分割。键可以被指定为位置键、表达式键和键选择器函数(关于如何指定键,请参见 Reduce 示例)。
val in: DataSet[(String, Int)] = // [...]
// hash-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByHash(0).mapPartition { ... }
Range-Partition #
在给定的键上 Range-partitions 一个 DataSet。键可以被指定为位置键、表达式键和键选择器函数(关于如何指定键,请参见 Reduce 示例)。
val in: DataSet[(String, Int)] = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByRange(0).mapPartition { ... }
Sort Partition #
按照指定的顺序,在指定的字段上对 DataSet 的所有分区进行本地排序。字段可以被指定为字段表达式或字段位置(关于如何指定键,请参阅 Reduce 示例)。通过链式 sortPartition()
调用,可以在多个字段上对分区进行排序。
val in: DataSet[(String, Int)] = // [...]
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
val out = in.sortPartition(1, Order.ASCENDING)
.sortPartition(0, Order.DESCENDING)
.mapPartition { ... }
First-n #
返回一个 DataSet 的前 n 个(任意)元素。First-n 可以应用于一个常规的 DataSet、一个分组的 DataSet 或一个分组排序的 DataSet。分组键可以被指定为键选择器函数或字段位置键(关于如何指定键,请参见 Reduce 示例)。
val in: DataSet[(String, Int)] = // [...]
// Return the first five (arbitrary) elements of the DataSet
val out1 = in.first(5)
// Return the first two (arbitrary) elements of each String group
val out2 = in.groupBy(0).first(2)
// Return the first three elements of each String group ordered by the Integer field
val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/dataset_transformations.html