批处理例子
— 焉知非鱼Batch Examples
Batch 示例 #
下面的示例程序展示了 Flink 的不同应用,从简单的单词计数到图形算法。这些代码样本说明了 Flink 的 DataSet API 的使用。
以下和更多例子的完整源代码可以在 Flink 源码库的 flink-examples-batch 模块中找到。
运行一个例子 #
为了运行一个 Flink 实例,我们假设你有一个正在运行的 Flink 实例。导航中的 “Quickstart” 和 “Setup” 选项卡描述了启动 Flink 的各种方法。
最简单的方法是运行 ./bin/start-cluster.sh
,默认情况下,它用一个 JobManager 和一个 TaskManager 启动一个本地集群。
Flink 的每个二进制版本都包含一个例子目录,其中有本页每个例子的 jar 文件。
要运行 WordCount 示例,请发出以下命令。
./bin/flink run ./examples/batch/WordCount.jar
其他的例子也可以用类似的方式启动。
请注意,许多例子在运行时没有传递任何参数,而是使用内置的数据。要使用真实数据运行 WordCount,你必须传递数据的路径。
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
请注意,非本地文件系统需要一个模式前缀,如 hdfs://
。
WordCount #
WordCount 是大数据处理系统中的 “Hello World”。它计算文本集合中的单词频率。该算法分两步工作。首先,文本被分割成单个单词。第二,对单词进行分组和计数。
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("/path/to/file")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath, "\n", " ")
WordCount 的例子实现了上面描述的算法,输入参数:--input <path> --output <path>
。作为测试数据,任何文本文件都可以。
页面排名 #
PageRank 算法计算由链接定义的图中页面的"重要性",这些链接从一个页面指向另一个页面。它是一种迭代图算法,这意味着它反复应用相同的计算。在每一次迭代中,每个页面将其当前的排名分布在所有的邻居上,并计算其新的排名,作为它从邻居那里得到的排名的累加和。PageRank 算法是由 Google 搜索引擎推广的,它利用网页的重要性来对搜索查询的结果进行排名。
在这个简单的例子中,PageRank 的实现方式是批量迭代和固定的迭代次数。
// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)
// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
// build adjacency list from link input
val adjacencyLists = links
// initialize lists
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outputPath, "\n", " ")
PageRank 程序实现了上述示例。它需要以下参数才能运行。--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
。
输入文件是纯文本文件,必须按以下格式进行。
- 页数用一个(长)ID 表示,用换行字符分隔。
- 例如 “1/n2/n12/n42/n63/n” 给出了 5 个 ID 为 1、2、12、42 和 63 的页面。
- 链接用页面 ID 对表示,用空格分隔。链接用换行符分隔。
- 例如 “1 2\n2 12\n1 12\n42 63\n” 给出了四个(定向)链接(1)->(2),(2)->(12),(1)->(12)和(42)->(63)。
对于这个简单的实现,要求每个页面至少有一个入站链接和一个出站链接(一个页面可以指向自己)。
连接的组件 #
Connected Components 算法通过给同一连接部分中的所有顶点分配相同的组件 ID,来识别较大图中相互连接的部分。与 PageRank 类似,Connected Components 是一种迭代算法。在每一步中,每个顶点将其当前的组件 ID 传播给所有的邻居。如果一个顶点接受来自邻居的组件 ID,如果它小于自己的组件 ID。
本实现使用增量迭代。没有改变组件 ID 的顶点不参与下一步。这产生了更好的性能,因为后面的迭代通常只处理一些离群的顶点。
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }
// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
(s, ws) =>
// apply the step logic: join with the edges
val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
(edge._2, vertex._2)
}
// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).min(1)
// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
(newVertex, oldVertex, out: Collector[(Long, Long)]) =>
if (newVertex._2 < oldVertex._2) out.collect(newVertex)
}
// delta and new workset are identical
(updatedComponents, updatedComponents)
}
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
ConnectedComponents 程序实现了上面的例子。它需要以下参数才能运行: --vertices <path> --edges <path> --output <path> --iterations <n>
。
输入文件是纯文本文件,必须按如下格式编写。
- 顶点用 ID 表示,并用换行符隔开。
- 例如 “1/n2/n12/n42/n63/n” 给出了五个顶点,分别是(1)、(2)、(12)、(42)和(63)。
- 边缘用一对顶点 ID 表示,这些顶点 ID 用空格字符分隔。边缘用换行符隔开。
- 例如,“1 2/n2 12/n1 12/n42 63/n” 给出了四个(非直接)联系(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/examples.html