数据集中的 zipping 元素
— 焉知非鱼Zipping Elements in a Dataset
Zipping 数据集中的元素 #
在某些算法中,人们可能需要为数据集元素分配唯一的标识符。本文档介绍了如何将 DataSetUtils 用于该目的。
使用密集索引进行 Zip #
zipWithIndex
给元素分配连续的标签,接收一个数据集作为输入,并返回一个新的(唯一id,初始值)2-tuples的数据集。这个过程需要两次传递,先计数再给元素贴标签,而且由于计数的同步性,不能采用流水线方式。备选的 zipWithUniqueId
以流水线的方式工作,当唯一的标签已经足够时,首选 zip
。例如,下面的代码。
import org.apache.flink.api.scala._
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
val result: DataSet[(Long, String)] = input.zipWithIndex
result.writeAsCsv(resultPath, "\n", ",")
env.execute()
可以得到元组: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
带有唯一标识符的 Zip #
在许多情况下,人们可能不需要分配连续的标签,zipWithUniqueId
以流水线的方式工作,加快了标签分配过程。该方法接收一个数据集作为输入,并返回一个由(唯一id,初始值)2-tuples组成的新数据集。例如,下面的代码。
import org.apache.flink.api.scala._
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
val result: DataSet[(Long, String)] = input.zipWithUniqueId
result.writeAsCsv(resultPath, "\n", ",")
env.execute()
可以得到元组: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/zip_elements_guide.html