1.Flink中exactly once实现原理分析
生产者从kafka拉取数据以及消费者往kafka写数据都需要保证exactly once。目前flink中支持exactly once的source不多,有kafka source;能实现exactly once的sink也不多,如kafka sink、streamingFileSink,其都要开启checkpoint才能实现exactly once。接下来以FlinkKafkaProducer为例,深入研究其源代码,从而理解flink中的exactly once(精准一次性语义)是怎么实现的。
1.1 大致流程图(也叫分两阶段提交原理)
1. JobManager定期(通过CheckpointCodinator)向各个包含state的subTask发起checkpoint的请求
2. subTask将各自的state写入到相应的statebackend,一个资源槽对应一个文件,其中各个subTask的state写入这个文件中
3. 各个subTask向JobManager发送checkpoint成功的消息
4. 当所有subTask都发送了checkpoint成功的消息后,jobManager会向所有实现了checkpoint的subTask发送成功的消息
5. subTask往kafka写数据,并且向Kafka提交事务()
注意:为了保证一个流水线(pipeline)上的operrator state和keyedstate数据一致,flink引入了barrier机制,即在jobmanager和taskManager间设置一个barrier,相当于节流,保证在checkpoint时,source不能在读取数据
问题:kafka涉及到生产者往里面写数据一个事务,以及消费者读取数据一个事务,这两个事物间有什么联系?
1.2 源码解析
(1)首先看FlinkKafkaProducer类,可以发现其继承了TwoPhaseCommitSinkFunction
(2)TwoPhaseCommitSinkFunction是所有要实现一次性语义的SinkFunction的一个比较推荐的基类,其实现了两个重要的接口,分别为:CheckpointedFunction, CheckpointListener
- CheckpointedFunction接口
此接口中包含两个方法,分别为snapshotState方法、initializeState方法,源代码如下
public interface CheckpointedFunction { /** * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself. * * @param context the context for drawing a snapshot of the operator * @throws Exception */ void snapshotState(FunctionSnapshotContext context) throws Exception; /** * This method is called when the parallel function instance is created during distributed * execution. Functions typically set up their state storing data structures in this method. * * @param context the context for initializing the operator * @throws Exception */ void initializeState(FunctionInitializationContext context) throws Exception; }
其中snapshotState方法是用checkpoint时,拍快照,其能将state持久化到statebackend。这里面存了一些transactionID、subTask编号、以及kafka的相关信息(用来写数据)。若是checkpoint成功了,但是subTask并没有成功将数据写入kafka,则会通过这个方法恢复原先最近的state进行恢复,然后继续
initializeState方法可以用来恢复state,解释可能以前将state持久化到了statebackend,但并没有将数据成功写入kafka,则可以ton过这个方法恢复最近的state,然后将数据继续往kafka写数据。
- CheckpointListener接口
此接口中包含一个notifyCheckpointComplete方法
源码如下
/** * This interface must be implemented by functions/operations that want to receive * a commit notification once a checkpoint has been completely acknowledged by all * participants. */ @PublicEvolving public interface CheckpointListener { /** * This method is called as a notification once a distributed checkpoint has been completed. * * Note that any exception during this method will not cause the checkpoint to * fail any more. * * @param checkpointId The ID of the checkpoint that has been completed. * @throws Exception */ void notifyCheckpointComplete(long checkpointId) throws Exception; }
notifyCheckpointComplete方法什么时候被调用呢?所有分区的subTask向JobManager相应checkpoint后才会被调用,即告知各个subTask,这次checkpoint成功了,可以进行下一步的操作了,该方法源码如下:
@Override public final void notifyCheckpointComplete(long checkpointId) throws Exception { // the following scenarios are possible here // // (1) there is exactly one transaction from the latest checkpoint that // was triggered and completed. That should be the common case. // Simply commit that transaction in that case. // // (2) there are multiple pending transactions because one previous // checkpoint was skipped. That is a rare case, but can happen // for example when: // // - the master cannot persist the metadata of the last // checkpoint (temporary outage in the storage system) but // could persist a successive checkpoint (the one notified here) // // - other tasks could not persist their status during // the previous checkpoint, but did not trigger a failure because they // could hold onto their state and could successfully persist it in // a successive checkpoint (the one notified here) // // In both cases, the prior checkpoint never reach a committed state, but // this checkpoint is always expected to subsume the prior one and cover all // changes since the last successful one. As a consequence, we need to commit // all pending transactions. // // (3) Multiple transactions are pending, but the checkpoint complete notification // relates not to the latest. That is possible, because notification messages // can be delayed (in an extreme case till arrive after a succeeding checkpoint // was triggered) and because there can be concurrent overlapping checkpoints // (a new one is started before the previous fully finished). // // ==> There should never be a case where we have no pending transaction here // Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); Throwable firstError = null; while (pendingTransactionIterator.hasNext()) { Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next(); Long pendingTransactionCheckpointId = entry.getKey(); TransactionHolder<TXN> pendingTransaction = entry.getValue(); if (pendingTransactionCheckpointId > checkpointId) { continue; } LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}", name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId); logWarningIfTimeoutAlmostReached(pendingTransaction); try { commit(pendingTransaction.handle); } catch (Throwable t) { if (firstError == null) { firstError = t; } } LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction); pendingTransactionIterator.remove(); } if (firstError != null) { throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure", firstError); } }
注意,该方法除了提醒个subTask此次checkpoint成功了外,还会提交事务,具体见源码如下(为该方法源码的一部分):
FlinkKafkaProducer中的commit方法
@Override protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { try { transaction.producer.commitTransaction(); } finally { recycleTransactionalProducer(transaction.producer); } } }
若是事务提交失败后,该怎么办呢?没关系,事务提交失败后,会根据重启策略重启,并调用initializeState方法恢复先前最近的一个state,继续往kafka写数据,提交事务,再次提交事务时,就不是调用commit方法了,而是调用FlinkKafkaProducer中的recoverAndCommit方法(这块也可能是preCommit方法,自己还没完全看懂源码),先恢复数据再commit事务,源码如下
@Override protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { try ( FlinkKafkaInternalProducer<byte[], byte[]> producer = initTransactionalProducer(transaction.transactionalId, false)) { producer.resumeTransaction(transaction.producerId, transaction.epoch); producer.commitTransaction(); } catch (InvalidTxnStateException | ProducerFencedException ex) { // That means we have committed this transaction before. LOG.warn("Encountered error {} while recovering transaction {}. " + "Presumably this transaction has been already committed before", ex, transaction); } } }
注意:这里可以保证checkpoint成功,以及事务提交成功,但是没法保证它俩在一起同时成功。但这也没关系,就算checkpoint成功了,事务没成功也没关系。事务没成功会回滚,它会从statebackend中恢复数据,然后再向kafka中写数据,提交事务。
2 自定义两阶段提交sink实例
自定义两阶段提交sink,其面向的存储系统一定要支持事务,比如mysq,0.11版以后的kafka。简单来说,自定义两阶段提交sink就是继承TwoPhaseCommitSinkFunction类,然后重写里面的方法,具体见下面的例子
MySQL分两阶段提交的Sink
druid连接池
package cn._51doit.flink.day11; import com.alibaba.druid.pool.DruidDataSourceFactory; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; public class DruidConnectionPool { private transient static DataSource dataSource = null; private transient static Properties props = new Properties(); static { props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8"); props.put("username", "root"); props.put("password", "123456"); try { dataSource = DruidDataSourceFactory.createDataSource(props); } catch (Exception e) { e.printStackTrace(); } } private DruidConnectionPool() { } public static Connection getConnection() throws SQLException { return dataSource.getConnection(); } }
MySqlTwoPhaseCommitSinkFunction
package cn._51doit.flink.day11; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, MySqlTwoPhaseCommitSink.ConnectionState, Void> { public MySqlTwoPhaseCommitSink() { super(new KryoSerializer<>(MySqlTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } @Override protected MySqlTwoPhaseCommitSink.ConnectionState beginTransaction() throws Exception { System.out.println("=====> beginTransaction... "); //Class.forName("com.mysql.jdbc.Driver"); //Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8", "root", "123456"); Connection connection = DruidConnectionPool.getConnection(); connection.setAutoCommit(false); return new ConnectionState(connection); } @Override protected void invoke(MySqlTwoPhaseCommitSink.ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception { Connection connection = connectionState.connection; PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?"); pstm.setString(1, value.f0); pstm.setInt(2, value.f1); pstm.setInt(3, value.f1); pstm.executeUpdate(); pstm.close(); } @Override protected void preCommit(MySqlTwoPhaseCommitSink.ConnectionState connectionState) throws Exception { System.out.println("=====> preCommit... " + connectionState); } @Override protected void commit(MySqlTwoPhaseCommitSink.ConnectionState connectionState) { System.out.println("=====> commit... "); Connection connection = connectionState.connection; try { connection.commit(); connection.close(); } catch (SQLException e) { throw new RuntimeException("提交事物异常"); } } @Override protected void abort(MySqlTwoPhaseCommitSink.ConnectionState connectionState) { System.out.println("=====> abort... "); Connection connection = connectionState.connection; try { connection.rollback(); connection.close(); } catch (SQLException e) { throw new RuntimeException("回滚事物异常"); } } static class ConnectionState { private final transient Connection connection; ConnectionState(Connection connection) { this.connection = connection; } } }
3 将数据写入Hbase
使用hbase的幂等性结合at least Once(flink中state能恢复,在两次checkpoint间可能会有重复读取数据的情况)实现精确一次性语义
HBaseUtil
package cn._51doit.flink.day11; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; /** * Hbase的工具类,用来创建Hbase的Connection */ public class HBaseUtil { /** * @param zkQuorum zookeeper地址,多个要用逗号分隔 * @param port zookeeper端口号 * @return */ public static Connection getConnection(String zkQuorum, int port) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", zkQuorum); conf.set("hbase.zookeeper.property.clientPort", port + ""); Connection connection = ConnectionFactory.createConnection(conf); return connection; } }
MyHbaseSink
package cn._51doit.flink.day11; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import java.util.ArrayList; import java.util.List; public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> { private transient Connection connection; private transient Integer maxSize = 1000; private transient Long delayTime = 5000L; private transient Long lastInvokeTime; private transient List<Put> puts = new ArrayList<>(maxSize); public MyHbaseSink() {} public MyHbaseSink(Integer maxSize, Long delayTime) { this.maxSize = maxSize; this.delayTime = delayTime; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); //创建一个Hbase的连接 connection = HBaseUtil.getConnection( params.getRequired("hbase.zookeeper.quorum"), params.getInt("hbase.zookeeper.property.clientPort", 2181) ); lastInvokeTime = System.currentTimeMillis(); } @Override public void invoke(Tuple2<String, Double> value, Context context) throws Exception { String rk = value.f0; Put put = new Put(rk.getBytes()); put.addColumn("data".getBytes(), "order".getBytes(), value.f1.toString().getBytes()); puts.add(put); //使用ProcessingTime long currentTime = System.currentTimeMillis(); //加到一个集合中 if(puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) { //获取一个HbaseTable Table table = connection.getTable(TableName.valueOf("myorder")); table.put(puts); puts.clear(); lastInvokeTime = currentTime; table.close(); } } @Override public void close() throws Exception { connection.close(); } }
4 ProtoBuf
protoBuf是一种序列化机制,数据存储还是二进制,其特点是序列化、反序列化快,占用空间小(相比json而言,是它的1/3)、跨平台、跨语言。
4.1 protobuf的使用测试
(1)创建一个maven工程
(2)导入pom依赖,具体内容见下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>protobuf-bean</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.7.1</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> <version>3.7.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.2</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact> com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier} </protocArtifact> <pluginId>grpc-java</pluginId> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
(3)在main目录下创建一个proto文件夹,在这个文件夹下编辑相应的xxx.proto文件,具体如下
syntax = "proto3";
option java_package = "cn._51doit.proto";
option java_outer_classname = "OrderProto";
message Order {
int32 id = 1;
string time = 2;
double money = 3;
}
(4)在maven的plugins中会有个protobuf插件,点击里面的protobuf.compile,即可在项目中的target目录下生成相应的protobuf bean文件(支持多种语言的schema信息)
(5)将得到的proto bean移到自己想要的目录中即可
此测试就是将json数据转成protoBuf bean格式数据,然后在将其序列化输出,以及反序列化至bean输出
OrderProtoTest
package cn._51doit.test; import cn._51doit.proto.OrderProto; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; public class OrderProtoTest { public static void main(String[] args) throws InvalidProtocolBufferException { String json = "{"id": 100, "time": "2020-07-01", "money": 66.66}"; //使用工具类生成一个类 OrderProto.Order.Builder bean = OrderProto.Order.newBuilder(); //将数据拷贝的bean中 JsonFormat.parser().merge(json, bean); bean.setId(666); bean.setTime("2019-10-18"); bean.setMoney(888.88); //序列化转成二进制 //bean -> byte数组 byte[] bytes = bean.build().toByteArray(); System.out.println("二进制:" + bytes); //反序列化 //二进制数组转成bean OrderProto.Order order = OrderProto.Order.parseFrom(bytes); System.out.println("对象格式:" + order); } }
4.2 将数据以ProtoBuf的二进制形式发送到Kafka
DataToKafka
package cn._51doit.test; import cn._51doit.proto.DataBeanProto; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class DataToKafka { public static void main(String[] args) { // 1 配置参数 Properties props = new Properties(); //连接kafka节点 props.setProperty("bootstrap.servers", "feng05:9092,feng06:9092,feng07:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); String topic = "dataproto"; // 2 kafka的生产者 KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); DataBeanProto.DataBean.Builder bean = DataBeanProto.DataBean.newBuilder(); DataBeanProto.DataBeans.Builder list = DataBeanProto.DataBeans.newBuilder(); for (int i = 1; i <= 100; i++) { //往bean中设置属性 bean.setId(i); bean.setTitle("doit-" + i); bean.setUrl("www.51doit.cn"); //将bean追加到list中 list.addDataBean(bean); //清空原来分组的数据 bean.clear(); if(list.getDataBeanCount() == 10) { //将beans的集合转成protobuf的二进制 byte[] bytes = list.build().toByteArray(); ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes); producer.send(record); //一次发送10条 producer.flush(); list.clear(); } } System.out.println("message send success"); // 释放资源 producer.close(); } }
4.3 Flume的KafkaChannel整合kafka序列化器
需求:(1)在kafka中定义序列化器,在数据写入kafka前,将之转成对应的二进制存入kafka
(2)Flink从Kafka中拉取刚存入相应格式的二进制数据,转成ProtoBuf的Bean
(1)kafka序列化器的实现
大致思路就是首先获取一个protoBuf bean,然后定义一个序列化器,实现一个Serializer接口,在里面重写serialize方法,具体逻辑见下面代码。将该代码打包,放到flume的lib文件夹中,注意需要将flume的lib中protobuf-java-2.5.0.jar注释或者删除掉。
KafkaProtoBufSerializer
package cn._51doit.test; import cn._51doit.proto.UserProto; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; public class KafkaProtoBufSerializer implements Serializer<byte[]> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, byte[] data) { // 将source传给channel的数据转成ProtoBuf的二进制 //line是一个json String line = new String(data); UserProto.User.Builder bean = UserProto.User.newBuilder(); //使用工具类将JSON的数据的数据set到bean中 try { JsonFormat.parser().merge(line, bean); } catch (InvalidProtocolBufferException e) { return null; } return bean.build().toByteArray(); //返回的是ProtoBuf的二进制 } @Override public byte[] serialize(String topic, Headers headers, byte[] data) { return new byte[0]; } @Override public void close() { } }
(2)Flink的Kafka反序列化器的实现
注意,此处除了要设置反序列化,即将kafka中确定topic中的protoBuf格式的二进制数据序列化成protoBuf的bean,还要指定bean的序列化规则(注册自定义的序列化类),这样flink处理该数据时才能进行网络传输
DataBeanProto(bean,跨语言)
使用4.1方法生成
DataBeansDeserializer反序列化器
package cn._51doit.flink.day11; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; /** * 自定义的Flink反序列化器 */ public class DataBeansDeserializer implements DeserializationSchema<DataBeanProto.DataBeans> { //反序列化 @Override public DataBeanProto.DataBeans deserialize(byte[] message) throws IOException { return DataBeanProto.DataBeans.parseFrom(message); } @Override public boolean isEndOfStream(DataBeanProto.DataBeans nextElement) { return false; } @Override public TypeInformation<DataBeanProto.DataBeans> getProducedType() { return TypeInformation.of(DataBeanProto.DataBeans.class); } }
PBSerializer序列化器
package cn._51doit.flink.day11; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.protobuf.Message; import java.lang.reflect.Method; import java.util.HashMap; public class PBSerializer extends Serializer<Message> { /* This cache never clears, but only scales like the number of * classes in play, which should not be very large. * We can replace with a LRU if we start to see any issues. */ final protected HashMap<Class, Method> methodCache = new HashMap<Class, Method>(); /** * This is slow, so we should cache to avoid killing perf: * See: http://www.jguru.com/faq/view.jsp?EID=246569 */ protected Method getParse(Class cls) throws Exception { Method meth = methodCache.get(cls); if (null == meth) { meth = cls.getMethod("parseFrom", new Class[]{ byte[].class }); methodCache.put(cls, meth); } return meth; } //序列化 @Override public void write(Kryo kryo, Output output, Message mes) { byte[] ser = mes.toByteArray(); output.writeInt(ser.length, true); output.writeBytes(ser); } //反序列化 @Override public Message read(Kryo kryo, Input input, Class<Message> pbClass) { try { int size = input.readInt(true); byte[] barr = new byte[size]; input.readBytes(barr); return (Message)getParse(pbClass).invoke(null, barr); } catch (Exception e) { throw new RuntimeException("Could not create " + pbClass, e); } } }
测试类
ProtoBufDemo
package cn._51doit.flink.day11; import cn._51doit.flink.day10.FlinkUtilsV2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.util.Collector; public class ProtoBufDemo { public static void main(String[] args) throws Exception{ ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]); DataStream<DataBeanProto.DataBeans> dataBeansStream = FlinkUtilsV2.createKafkaDataStream(parameters, "dataproto", "gid", DataBeansDeserializer.class); //注册自定义的序列化类 FlinkUtilsV2.getEnv().getConfig().registerTypeWithKryoSerializer(DataBeanProto.DataBeans.class, PBSerializer.class); FlinkUtilsV2.getEnv().getConfig().registerTypeWithKryoSerializer(DataBeanProto.DataBean.class, PBSerializer.class); SingleOutputStreamOperator<DataBeanProto.DataBean> dataBeanStream = dataBeansStream.flatMap( new FlatMapFunction<DataBeanProto.DataBeans, DataBeanProto.DataBean>() { @Override public void flatMap(DataBeanProto.DataBeans list, Collector<DataBeanProto.DataBean> out) throws Exception { for (DataBeanProto.DataBean dataBean : list.getDataBeanList()) { out.collect(dataBean); } } }); dataBeanStream.print(); FlinkUtilsV2.getEnv().execute(); } }