DataStream API 介绍
— 焉知非鱼Intro to the DataStream API
本次培训的重点是广泛地介绍 DataStream API,使你能够开始编写流式应用程序。
什么可以被流式化? #
Flink 的 DataStream API(Java 和 Scala)可以让你流化任何可以序列化的东西。Flink 自己的序列化器用于:
- 基本类型,即 String, Long, Integer, Boolean, Array
- 复合类型。Tuples, POJOs 和 Scala case classes
而 Flink 又回到了 Kryo 的其他类型。也可以在 Flink 中使用其他序列化器。特别是 Avro,得到了很好的支持。
Java 元组 和 POJO #
Flink 的本地序列化器可以有效地操作元组和 POJO。
元组
对于 Java,Flink 定义了自己的 Tuple0 到 Tuple25 类型。
Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
// zero based index!
String name = person.f0;
Integer age = person.f1;
POJO
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许"按名称"字段引用)。
- 类是公共的和独立的(没有非静态的内部类)。
- 该类有一个公共的无参数构造函数。
- 类(以及所有超级类)中的所有非静态、非瞬态字段要么是公共的(而且是非最终的),要么有公共的 getter- 和 setter- 方法,这些方法遵循 Java beans 中 getter 和 setter 的命名约定。
例如:
public class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
. . .
};
}
Person person = new Person("Fred Flintstone", 35);
Flink 的序列化器支持 POJO 类型的模式进化。
Scala 元组和 case class #
这些工作就像你期望的那样。
一个完整的例子 #
这个例子将一个关于人的记录流作为输入,并将其过滤为只包括成年人。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}
流执行环境 #
每个 Flink 应用都需要一个执行环境,本例中的 env。流式应用需要使用一个 StreamExecutionEnvironment。
在你的应用程序中进行的 DataStream API 调用建立了一个作业图(job graph),这个作业图被附加到 StreamExecutionEnvironment 上。当调用 env.execute() 时,这个图会被打包并发送给 JobManager,JobManager 将作业并行化,并将它的片断分配给 Task Manager 执行。你的作业的每个并行片断将在一个任务槽(task slot)中执行。
注意,如果你不调用 execute(),你的应用程序将不会被运行。
这种分布式运行时取决于你的应用程序是可序列化的。它还要求所有的依赖关系都能在集群中的每个节点上使用。
基本的流源 #
上面的例子使用 env.fromElements(...)
构造了一个 DataStream[Person]
。这是一种方便的方法,可以将一个简单的流组合起来,用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。所以,你可以用这个方法来代替。
val people: List[Person] = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
val flintstones: DataStream[Person] = env.fromCollection(people);
另一种方便的方法是在原型开发时将一些数据导入流中,使用 socket:
val lines: DataStream[String] = env.socketTextStream("localhost", 9999)
或从文件中读取:
val lines: DataStream[String] = env.readTextFile("file:///path");
在实际应用中,最常用的数据源是那些支持低延迟、高吞吐量并行读取并结合倒带和重放的数据源–这是高性能和容错的先决条件–如 Apache Kafka、Kinesis 和各种文件系统。REST API 和数据库也经常被用于流的丰富。
基本的流式接收器 #
上面的例子使用 adults.print()
将其结果打印到 task manager 的日志中(当在 IDE 中运行时,它将出现在你的 IDE 的控制台中)。这将在流的每个元素上调用 toString()
。
输出结果看起来像这样:
1> Fred: age 35
2> Wilma: age 35
其中 1>
和 2>
表示哪个子任务(即线程)产生的输出。
在生产中,常用的接收器括 StreamingFileSink、各种数据库和一些 pub-sub 系统。
调试 #
在生产中,你的应用程序将在远程集群或一组容器中运行。而如果它失败了,它将会远程失败。JobManager 和 TaskManager 日志对调试此类故障非常有帮助,但在 IDE 内部进行本地调试要容易得多,Flink 支持这一点。你可以设置断点,检查本地变量,并逐步检查你的代码。你也可以步入 Flink 的代码,如果你好奇 Flink 是如何工作的,这可以是一个很好的方式来了解它的内部结构。
实践 #
在这一点上,你知道了足够的知识,可以开始编码和运行一个简单的 DataStream 应用程序。克隆 flink-training repo,按照 README 中的说明操作后,进行第一个练习。过滤一个流(Ride Cleansing)。