Custom Serializer
— 焉知非鱼Custom Serializer
为你的 Flink 程序注册一个自定义的序列器
如果你在 Flink 程序中使用的自定义类型不能被 Flink 类型序列化器序列化,Flink 就会回到使用通用的 Kryo 序列化器。你可以用 Kryo 注册你自己的序列化器或像 Google Protobuf 或 Apache Thrift 这样的序列化系统。要做到这一点,只需在 Flink 程序的 ExecutionConfig 中注册类型类和序列化器。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
请注意,你的自定义序列化器必须扩展 Kryo 的序列化器类。在 Google Protobuf 或 Apache Thrift 的情况下,这已经为你完成了。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
为了使上面的例子有效,你需要在 Maven 项目文件(pom.xml)中加入必要的依赖关系。在依赖关系部分,为 Apache Thrift 添加以下内容。
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-thrift</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.11.0</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
对于 Google Protobuf,你需要以下 Maven 依赖。
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.0</version>
</dependency>
请根据需要调整两个库的版本。
使用 Kryo 的 JavaSerializer 的问题。 #
如果你为你的自定义类型注册了 Kryo 的 JavaSerializer,你可能会遇到 ClassNotFoundExceptions,即使你的自定义类型类包含在提交的用户代码 jar 中。这是由于 Kryo 的 JavaSerializer 的一个已知问题,它可能会错误地使用错误的 classloader。
在这种情况下,你应该使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 来代替解决这个问题。这是 Flink 中重新实现的 JavaSerializer,它可以确保使用用户代码类加载器。
更多细节请参考 FLINK-6025。
原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html