Hadoop 的兼容性
— 焉知非鱼Hadoop Compatibility Beta
Hadoop 兼容性测试版 #
Flink 与 Apache Hadoop MapReduce 接口兼容,因此允许重用为 Hadoop MapReduce 实现的代码。
您可以:
- 在 Flink 程序中使用 Hadoop 的可写数据类型。
- 使用任何 Hadoop InputFormat 作为数据源。
- 使用任何 Hadoop 输出格式作为数据接收器。
- 将 Hadoop Mapper 用作 FlatMapFunction。
- 使用 Hadoop Reducer 作为 GroupReduceFunction。
本文档展示了如何将现有的 Hadoop MapReduce 代码与 Flink 一起使用。从 Hadoop 支持的文件系统读取代码,请参考连接到其他系统指南。
项目配置 #
对 Hadoop 输入/输出格式的支持是 flink-java 和 flink-scala Maven 模块的一部分,这些模块在编写 Flink 作业时总是需要的。这些代码位于 org.apache.flink.api.java.hadoop
和 org.apache.flink.api.scala.hadoop
中的 mapred 和 mapreduce API 的附加子包中。
对 Hadoop Mappers 和 Reducers 的支持包含在 flink-hadoop-compatibility
Maven 模块中。这段代码位于 org.apache.flink.hadoopcompatibility
包中。
如果您想重用 Mappers 和 Reducers,请在 pom.xml 中添加以下依赖关系。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.11.0</version>
</dependency>
另请参见如何配置 hadoop 依赖关系。
使用 Hadoop 输入格式 #
要使用 Flink 的 Hadoop InputFormats,必须先使用 HadoopInputs 实用程序类的 readHadoopFile 或 createHadoopInput 来包装格式。前者用于从 FileInputFormat 派生的输入格式,而后者必须用于通用的输入格式。通过使用 ExecutionEnvironmen#createInput
,产生的 InputFormat 可以用来创建数据源。
生成的 DataSet 包含 2 个元组,其中第一个字段是键,第二个字段是从 Hadoop InputFormat 中检索的值。
下面的示例展示了如何使用 Hadoop 的 TextInputFormat。
val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(LongWritable, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
// Do something with the data.
[...]
使用 Hadoop 输出格式 #
Flink 为 Hadoop OutputFormat 提供了一个兼容性封装器,它支持任何实现 org.apache.hadoop.mapred.OutputFormat 或扩展 org.apache.hadoop.mapreduce.OutputFormat 的类。OutputFormat 包装器希望它的输入数据是一个包含2个key和value的 DataSet。这些数据将由 Hadoop OutputFormat 处理。
下面的示例展示了如何使用 Hadoop 的 TextOutputFormat。
// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
new TextOutputFormat[Text, IntWritable],
new JobConf)
hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
hadoopResult.output(hadoopOF)
使用 Hadoop Mappers 和 Reducers #
Hadoop Mappers 在语义上等同于 Flink 的 FlatMapFunctions,Hadoop Reducers 等同于 Flink 的 GroupReduceFunctions。Flink 为 Hadoop MapReduce 的 Mapper 和 Reducer 接口的实现提供了封装器,也就是说,你可以在常规的 Flink 程序中重用你的 Hadoop Mapper 和 Reducer。目前,只支持 Hadoop 的 mapred API(org.apache.hadoop.mapred)的 Mapper 和 Reduce 接口。
包装器将一个 DataSet<Tuple2<KEYIN,VALUEIN>
作为输入,并产生一个 DataSet<Tuple2<KEYOUT,VALUEOUT>
作为输出,其中 KEYIN 和 KEYOUT 是键,VALUEIN 和 VALUEOUT 是 Hadoop 函数处理的 Hadoop 键值对的值。对于 Reducers,Flink 提供了一个包装器,用于带(HadoopReduceCombineFunction)和不带 Combiner(HadoopReduceFunction)的 GroupReduceFunction。包装器接受一个可选的 JobConf 对象来配置 Hadoop Mapper 或 Reducer。
Flink 的函数包装器有:
- sorg.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
- sorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, 和
- sorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.
并可作为常规的 Flink FlatMapFunctions 或 GroupReduceFunctions 使用。
下面的例子展示了如何使用 Hadoop Mapper 和 Reducer 函数:
// Obtain data to process somehow.
DataSet<Tuple2<LongWritable, Text>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
请注意:Reducer 包装器工作在 Flink 的 groupBy() 操作所定义的组上。它不考虑您在 JobConf 中设置的任何自定义分区器、排序或分组比较器。
完整的 Hadoop WordCount 示例 #
下面的示例展示了使用 Hadoop 数据类型、Input-和 OutputFormats 以及 Mapper 和 Reducer 实现的完整 WordCount 实现。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, LongWritable> hadoopOF =
new HadoopOutputFormat<Text, LongWritable>(
new TextOutputFormat<Text, LongWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);
// Execute Program
env.execute("Hadoop WordCount");
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/hadoop_compatibility.html