zoukankan      html  css  js  c++  java
  • 解决Flink消费Kafka信息,结果存储在Mysql的重复消费问题

    背景

    最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,于是在flink官网中搜索此类场景的处理方式,发现官网也没有实现flink到mysql的Exactly-Once例子,但是官网却有类似的例子来解决端到端的仅一次消费问题。这个现成的例子就是FlinkKafkaProducer011这个类,它保证了通过FlinkKafkaProducer011发送到kafka的消息是Exactly-Once的,主要的实现方式就是继承了TwoPhaseCommitSinkFunction这个类,关于TwoPhaseCommitSinkFunction这个类的作用可以先看上一篇文章:https://blog.51cto.com/simplelife/2401411

    实现思想

    这里简单说下这个类的作用就是实现这个类的方法:beginTransaction、preCommit、commit、abort,达到事件(preCommit)预提交的逻辑(当事件进行自己的逻辑处理后进行预提交,如果预提交成功之后才进行真正的(commit)提交,如果预提交失败则调用abort方法进行事件的回滚操作),结合flink的checkpoint机制,来保存topic中partition的offset。

    达到的效果我举个例子来说明下:比如checkpoint每10s进行一次,此时用FlinkKafkaConsumer011实时消费kafka中的消息,消费并处理完消息后,进行一次预提交数据库的操作,如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中,如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重启的策略可以在配置中设置,当然下一次的checkpoint也不会做了,checkpoint记录的还是上一次成功消费的offset,本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了,注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。等你将异常数据处理完成之后,再重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的Exactly-Once。

    具体实现代码三个类

    1. StreamDemoKafka2Mysql.java
    1.  
      import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    2.  
      import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    3.  
      import org.apache.flink.streaming.api.CheckpointingMode;
    4.  
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5.  
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6.  
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    7.  
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
    8.  
      import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
    9.  
      import org.apache.kafka.clients.consumer.ConsumerConfig;
    10.  
       
    11.  
      import java.util.Properties;
    12.  
       
    13.  
      /**
    14.  
      * Created with IntelliJ IDEA.
    15.  
      * User: zzy
    16.  
      * Date: 2019/5/28
    17.  
      * Time: 8:40 PM
    18.  
      * To change this template use File | Settings | File Templates.
    19.  
      *
    20.  
      * 消费kafka消息,sink(自定义)到mysql中,保证kafka to mysql 的Exactly-Once
    21.  
      */
    22.  
       
    23.  
      @SuppressWarnings("all")
    24.  
      public class StreamDemoKafka2Mysql {
    25.  
      private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";
    26.  
       
    27.  
      public static void main(String[] args) throws Exception {
    28.  
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    29.  
       
    30.  
      //设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度
    31.  
      env.setParallelism(1);
    32.  
      //checkpoint的设置
    33.  
      //每隔10s进行启动一个检查点【设置checkpoint的周期】
    34.  
      env.enableCheckpointing(10000);
    35.  
      //设置模式为:exactly_one,仅一次语义
    36.  
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    37.  
      //确保检查点之间有1s的时间间隔【checkpoint最小间隔】
    38.  
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
    39.  
      //检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】
    40.  
      env.getCheckpointConfig().setCheckpointTimeout(10000);
    41.  
      //同一时间只允许进行一次检查点
    42.  
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    43.  
      //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
    44.  
      //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    45.  
      //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
    46.  
      // env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));
    47.  
       
    48.  
      //设置kafka消费参数
    49.  
      Properties props = new Properties();
    50.  
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zzy:9092");
    51.  
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group2");
    52.  
      //kafka分区自动发现周期
    53.  
      props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");
    54.  
       
    55.  
      /*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/
    56.  
      FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic_ExactlyOnce, new JSONKeyValueDeserializationSchema(true), props);
    57.  
       
    58.  
      //加入kafka数据源
    59.  
      DataStreamSource<ObjectNode> streamSource = env.addSource(kafkaConsumer011);
    60.  
      // System.out.println("streamSource:" + streamSource.print());
    61.  
      streamSource.print();
    62.  
      //数据传输到下游
    63.  
      streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");
    64.  
      //触发执行
    65.  
      env.execute(StreamDemoKafka2Mysql.class.getName());
    66.  
      }
    67.  
      }

    2.MySqlTwoPhaseCommitSink.java

    1.  
      import org.apache.flink.api.common.ExecutionConfig;
    2.  
      import org.apache.flink.api.common.typeutils.TypeSerializer;
    3.  
      import org.apache.flink.api.common.typeutils.base.VoidSerializer;
    4.  
      import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    5.  
      import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    6.  
      import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    7.  
      import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    8.  
      import org.slf4j.Logger;
    9.  
      import org.slf4j.LoggerFactory;
    10.  
       
    11.  
      import java.sql.Connection;
    12.  
      import java.sql.PreparedStatement;
    13.  
      import java.sql.Timestamp;
    14.  
      import java.text.SimpleDateFormat;
    15.  
      import java.util.Date;
    16.  
       
    17.  
      /**
    18.  
      * Created with IntelliJ IDEA.
    19.  
      * User: zzy
    20.  
      * Date: 2019/5/28
    21.  
      * Time: 8:47 PM
    22.  
      * To change this template use File | Settings | File Templates.
    23.  
      *
    24.  
      * 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交
    25.  
      */
    26.  
      public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode,Connection,Void> {
    27.  
       
    28.  
      private static final Logger log = LoggerFactory.getLogger(MySqlTwoPhaseCommitSink.class);
    29.  
       
    30.  
      public MySqlTwoPhaseCommitSink(){
    31.  
      super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
    32.  
      }
    33.  
       
    34.  
      /**
    35.  
      * 执行数据库入库操作 task初始化的时候调用
    36.  
      @param connection
    37.  
      @param objectNode
    38.  
      @param context
    39.  
      @throws Exception
    40.  
      */
    41.  
      @Override
    42.  
      protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {
    43.  
      log.info("start invoke...");
    44.  
      String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    45.  
      log.info("===>date:" + date + " " + objectNode);
    46.  
      log.info("===>date:{} --{}",date,objectNode);
    47.  
      String value = objectNode.get("value").toString();
    48.  
      log.info("objectNode-value:" + value);
    49.  
      JSONObject valueJson = JSONObject.parseObject(value);
    50.  
      String value_str = (String) valueJson.get("value");
    51.  
      String sql = "insert into `mysqlExactlyOnce_test` (`value`,`insert_time`) values (?,?)";
    52.  
      PreparedStatement ps = connection.prepareStatement(sql);
    53.  
      ps.setString(1,value_str);
    54.  
      Timestamp value_time = new Timestamp(System.currentTimeMillis());
    55.  
      ps.setTimestamp(2,value_time);
    56.  
      log.info("要插入的数据:{}--{}",value_str,value_time);
    57.  
      //执行insert语句
    58.  
      ps.execute();
    59.  
      //手动制造异常
    60.  
      if(Integer.parseInt(value_str) == 15) {
    61.  
      System.out.println(1 / 0);
    62.  
      }
    63.  
      }
    64.  
       
    65.  
      /**
    66.  
      * 获取连接,开启手动提交事物(getConnection方法中)
    67.  
      @return
    68.  
      @throws Exception
    69.  
      */
    70.  
      @Override
    71.  
      protected Connection beginTransaction() throws Exception {
    72.  
      log.info("start beginTransaction.......");
    73.  
      String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
    74.  
      Connection connection = DBConnectUtil.getConnection(url, "root", "123456");
    75.  
      return connection;
    76.  
      }
    77.  
       
    78.  
      /**
    79.  
      *预提交,这里预提交的逻辑在invoke方法中
    80.  
      @param connection
    81.  
      @throws Exception
    82.  
      */
    83.  
      @Override
    84.  
      protected void preCommit(Connection connection) throws Exception {
    85.  
      log.info("start preCommit...");
    86.  
      }
    87.  
       
    88.  
      /**
    89.  
      * 如果invoke方法执行正常,则提交事务
    90.  
      @param connection
    91.  
      */
    92.  
      @Override
    93.  
      protected void commit(Connection connection) {
    94.  
      log.info("start commit...");
    95.  
      DBConnectUtil.commit(connection);
    96.  
      }
    97.  
       
    98.  
      /**
    99.  
      * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行
    100.  
      @param connection
    101.  
      */
    102.  
      @Override
    103.  
      protected void abort(Connection connection) {
    104.  
      log.info("start abort rollback...");
    105.  
      DBConnectUtil.rollback(connection);
    106.  
      }
    107.  
      }

    3.DBConnectUtil.java

    1.  
      import org.slf4j.Logger;
    2.  
      import org.slf4j.LoggerFactory;
    3.  
       
    4.  
      import java.sql.DriverManager;
    5.  
      import java.sql.SQLException;
    6.  
      import java.sql.Connection;
    7.  
       
    8.  
      /**
    9.  
      * Created with IntelliJ IDEA.
    10.  
      * User: zzy
    11.  
      * Date: 2019/5/28
    12.  
      * Time: 8:58 PM
    13.  
      * To change this template use File | Settings | File Templates.
    14.  
      */
    15.  
      public class DBConnectUtil {
    16.  
       
    17.  
      private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);
    18.  
       
    19.  
      /**
    20.  
      * 获取连接
    21.  
      *
    22.  
      @param url
    23.  
      @param user
    24.  
      @param password
    25.  
      @return
    26.  
      @throws SQLException
    27.  
      */
    28.  
      public static Connection getConnection(String url, String user, String password) throws SQLException {
    29.  
      Connection conn = null;
    30.  
      try {
    31.  
      Class.forName("com.mysql.jdbc.Driver");
    32.  
      catch (ClassNotFoundException e) {
    33.  
      log.error("获取mysql.jdbc.Driver失败");
    34.  
      e.printStackTrace();
    35.  
      }
    36.  
      try {
    37.  
      conn = DriverManager.getConnection(url, user, password);
    38.  
      log.info("获取连接:{} 成功...",conn);
    39.  
      }catch (Exception e){
    40.  
      log.error("获取连接失败,url:" + url + ",user:" + user);
    41.  
      }
    42.  
       
    43.  
      //设置手动提交
    44.  
      conn.setAutoCommit(false);
    45.  
      return conn;
    46.  
      }
    47.  
       
    48.  
      /**
    49.  
      * 提交事物
    50.  
      */
    51.  
      public static void commit(Connection conn) {
    52.  
      if (conn != null) {
    53.  
      try {
    54.  
      conn.commit();
    55.  
      catch (SQLException e) {
    56.  
      log.error("提交事物失败,Connection:" + conn);
    57.  
      e.printStackTrace();
    58.  
      finally {
    59.  
      close(conn);
    60.  
      }
    61.  
      }
    62.  
      }
    63.  
       
    64.  
      /**
    65.  
      * 事物回滚
    66.  
      *
    67.  
      @param conn
    68.  
      */
    69.  
      public static void rollback(Connection conn) {
    70.  
      if (conn != null) {
    71.  
      try {
    72.  
      conn.rollback();
    73.  
      catch (SQLException e) {
    74.  
      log.error("事物回滚失败,Connection:" + conn);
    75.  
      e.printStackTrace();
    76.  
      finally {
    77.  
      close(conn);
    78.  
      }
    79.  
      }
    80.  
      }
    81.  
       
    82.  
      /**
    83.  
      * 关闭连接
    84.  
      *
    85.  
      @param conn
    86.  
      */
    87.  
      public static void close(Connection conn) {
    88.  
      if (conn != null) {
    89.  
      try {
    90.  
      conn.close();
    91.  
      catch (SQLException e) {
    92.  
      log.error("关闭连接失败,Connection:" + conn);
    93.  
      e.printStackTrace();
    94.  
      }
    95.  
      }
    96.  
      }
    97.  
      }

    4.代码测试

    为了方便发送消息,我用一个定时任务每秒发送一个数字,1~20,往kafka写日志的程序

    1.  
      public class KafkaUtils {
    2.  
      // private static final String broker_list = "localhost:9092";
    3.  
      private static final String broker_list = "zzy:9092";
    4.  
      //flink 读取kafka写入mysql exactly-once 的topic
    5.  
      private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";
    6.  
       
    7.  
      public static void writeToKafka2() throws InterruptedException {
    8.  
      Properties props = new Properties();
    9.  
      props.put("bootstrap.servers", broker_list);
    10.  
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    11.  
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    12.  
      // KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已废弃
    13.  
      Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
    14.  
       
    15.  
      try {
    16.  
      for (int i = 1; i <= 20; i++) {
    17.  
      MysqlExactlyOncePOJO mysqlExactlyOnce = new MysqlExactlyOncePOJO(String.valueOf(i));
    18.  
      ProducerRecord record = new ProducerRecord<String, String>(topic_ExactlyOnce, null, null, JSON.toJSONString(mysqlExactlyOnce));
    19.  
      producer.send(record);
    20.  
      System.out.println("发送数据: " + JSON.toJSONString(mysqlExactlyOnce));
    21.  
      Thread.sleep(1000);
    22.  
      }
    23.  
      }catch (Exception e){
    24.  
       
    25.  
      }
    26.  
       
    27.  
      producer.flush();
    28.  
      }
    29.  
       
    30.  
      public static void main(String[] args) throws InterruptedException {
    31.  
      writeToKafka2();
    32.  
      }
    33.  
      }
    34.  
       
    35.  
       
    36.  
      @Data
    37.  
      @NoArgsConstructor
    38.  
      @AllArgsConstructor
    39.  
      public class MysqlExactlyOncePOJO {
    40.  
      private String value;
    41.  
      }

    在发送到数字15之前,应该是做过一次checkpoint了,并且快要到第二次checkpoint的时间,第一次checkpoint的消费数据成功将插入数据库中,在消费到数字15的时候,手动造一个异常,此时数据库中应该只有第一次checkpoint后commit的数据,第二次checkpoint的数据并不会插入到数据库中(因为预提交已经失败,不会进行真正的提交),我实验的日志信息:

    1.  
      19/06/01 14:52:07 INFO TypeExtractor: Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
    2.  
      19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Setting restore state in the FlinkKafkaConsumer: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}
    3.  
      19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:
    4.  
      auto.commit.interval.ms = 5000
    5.  
      auto.offset.reset = latest
    6.  
      bootstrap.servers = [zzy:9092]
    7.  
      check.crcs = true
    8.  
      client.id =
    9.  
      connections.max.idle.ms = 540000
    10.  
      enable.auto.commit = true
    11.  
      exclude.internal.topics = true
    12.  
      fetch.max.bytes = 52428800
    13.  
      fetch.max.wait.ms = 500
    14.  
      fetch.min.bytes = 1
    15.  
      group.id = flink-consumer-group2
    16.  
      heartbeat.interval.ms = 3000
    17.  
      interceptor.classes = null
    18.  
      internal.leave.group.on.close = true
    19.  
      isolation.level = read_uncommitted
    20.  
      key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    21.  
      max.partition.fetch.bytes = 1048576
    22.  
      max.poll.interval.ms = 300000
    23.  
      max.poll.records = 500
    24.  
      metadata.max.age.ms = 300000
    25.  
      metric.reporters = []
    26.  
      metrics.num.samples = 2
    27.  
      metrics.recording.level = INFO
    28.  
      metrics.sample.window.ms = 30000
    29.  
      partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    30.  
      receive.buffer.bytes = 65536
    31.  
      reconnect.backoff.max.ms = 1000
    32.  
      reconnect.backoff.ms = 50
    33.  
      request.timeout.ms = 305000
    34.  
      retry.backoff.ms = 100
    35.  
      sasl.jaas.config = null
    36.  
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
    37.  
      sasl.kerberos.min.time.before.relogin = 60000
    38.  
      sasl.kerberos.service.name = null
    39.  
      sasl.kerberos.ticket.renew.jitter = 0.05
    40.  
      sasl.kerberos.ticket.renew.window.factor = 0.8
    41.  
      sasl.mechanism = GSSAPI
    42.  
      security.protocol = PLAINTEXT
    43.  
      send.buffer.bytes = 131072
    44.  
      session.timeout.ms = 10000
    45.  
      ssl.cipher.suites = null
    46.  
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    47.  
      ssl.endpoint.identification.algorithm = null
    48.  
      ssl.key.password = null
    49.  
      ssl.keymanager.algorithm = SunX509
    50.  
      ssl.keystore.location = null
    51.  
      ssl.keystore.password = null
    52.  
      ssl.keystore.type = JKS
    53.  
      ssl.protocol = TLS
    54.  
      ssl.provider = null
    55.  
      ssl.secure.random.implementation = null
    56.  
      ssl.trustmanager.algorithm = PKIX
    57.  
      ssl.truststore.location = null
    58.  
      ssl.truststore.password = null
    59.  
      ssl.truststore.type = JKS
    60.  
      value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    61.  
       
    62.  
      19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn'known config.
    63.  
      19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0
    64.  
      19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f
    65.  
      19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}
    66.  
      19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:
    67.  
      auto.commit.interval.ms = 5000
    68.  
      auto.offset.reset = latest
    69.  
      bootstrap.servers = [zzy:9092]
    70.  
      check.crcs = true
    71.  
      client.id =
    72.  
      connections.max.idle.ms = 540000
    73.  
      enable.auto.commit = false
    74.  
      exclude.internal.topics = true
    75.  
      fetch.max.bytes = 52428800
    76.  
      fetch.max.wait.ms = 500
    77.  
      fetch.min.bytes = 1
    78.  
      group.id = flink-consumer-group2
    79.  
      heartbeat.interval.ms = 3000
    80.  
      interceptor.classes = null
    81.  
      internal.leave.group.on.close = true
    82.  
      isolation.level = read_uncommitted
    83.  
      key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    84.  
      max.partition.fetch.bytes = 1048576
    85.  
      max.poll.interval.ms = 300000
    86.  
      max.poll.records = 500
    87.  
      metadata.max.age.ms = 300000
    88.  
      metric.reporters = []
    89.  
      metrics.num.samples = 2
    90.  
      metrics.recording.level = INFO
    91.  
      metrics.sample.window.ms = 30000
    92.  
      partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    93.  
      receive.buffer.bytes = 65536
    94.  
      reconnect.backoff.max.ms = 1000
    95.  
      reconnect.backoff.ms = 50
    96.  
      request.timeout.ms = 305000
    97.  
      retry.backoff.ms = 100
    98.  
      sasl.jaas.config = null
    99.  
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
    100.  
      sasl.kerberos.min.time.before.relogin = 60000
    101.  
      sasl.kerberos.service.name = null
    102.  
      sasl.kerberos.ticket.renew.jitter = 0.05
    103.  
      sasl.kerberos.ticket.renew.window.factor = 0.8
    104.  
      sasl.mechanism = GSSAPI
    105.  
      security.protocol = PLAINTEXT
    106.  
      send.buffer.bytes = 131072
    107.  
      session.timeout.ms = 10000
    108.  
      ssl.cipher.suites = null
    109.  
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    110.  
      ssl.endpoint.identification.algorithm = null
    111.  
      ssl.key.password = null
    112.  
      ssl.keymanager.algorithm = SunX509
    113.  
      ssl.keystore.location = null
    114.  
      ssl.keystore.password = null
    115.  
      ssl.keystore.type = JKS
    116.  
      ssl.protocol = TLS
    117.  
      ssl.provider = null
    118.  
      ssl.secure.random.implementation = null
    119.  
      ssl.trustmanager.algorithm = PKIX
    120.  
      ssl.truststore.location = null
    121.  
      ssl.truststore.password = null
    122.  
      ssl.truststore.type = JKS
    123.  
      value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    124.  
       
    125.  
      19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn'known config.
    126.  
      19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0
    127.  
      19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f
    128.  
      {"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
    129.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
    130.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
    131.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
    132.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"12"}
    133.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:12--2019-06-01 14:52:07.616
    134.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
    135.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
    136.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
    137.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"13"}
    138.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:13--2019-06-01 14:52:07.617
    139.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
    140.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
    141.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
    142.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"14"}
    143.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:14--2019-06-01 14:52:07.618
    144.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
    145.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
    146.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
    147.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"15"}
    148.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的数据:15--2019-06-01 14:52:07.619
    149.  
      {"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
    150.  
      {"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
    151.  
      {"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
    152.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start abort rollback...
    153.  
      19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.
    154.  
      java.lang.ArithmeticException: / by zero
    155.  
      at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:68)
    156.  
      at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:30)
    157.  
      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
    158.  
      at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    159.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    160.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    161.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    162.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
    163.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
    164.  
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    165.  
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    166.  
      at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    167.  
      at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    168.  
      at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    169.  
      at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
    170.  
      at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
    171.  
      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:675)
    172.  
      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:667)
    173.  
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    174.  
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    175.  
      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    176.  
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    177.  
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    178.  
      at java.lang.Thread.run(Thread.java:748)
    179.  
      19/06/01 14:52:07 INFO Task: Freeing task resources for Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903).
    180.  
      19/06/01 14:52:07 INFO Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) [FAILED]
    181.  
      19/06/01 14:52:07 INFO TaskExecutor: Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) c284f48cd0b113da4f68fd835e643903.
    182.  
      19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.
    183.  
      java.lang.ArithmeticException: / by zero
    184.  
      at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:68)
    185.  
      at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:30)
    186.  
      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
    187.  
      at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    188.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    189.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    190.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    191.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
    192.  
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
    193.  
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    194.  
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    195.  
      at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    196.  
      at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    197.  
      at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    198.  
      at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
    199.  
      at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
    200.  
      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:675)
    201.  
      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:667)
    202.  
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    203.  
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    204.  
      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    205.  
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    206.  
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    207.  
      at java.lang.Thread.run(Thread.java:748)
    208.  
      19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RUNNING to FAILING.
    209.  
      ...
    210.  
      19/06/01 14:52:07 INFO TaskExecutor: Discarding the results produced by task execution c284f48cd0b113da4f68fd835e643903.
    211.  
      19/06/01 14:52:07 INFO ExecutionGraph: Try to restart or fail the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) if no longer possible.
    212.  
      19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state FAILING to RESTARTING.
    213.  
      19/06/01 14:52:07 INFO ExecutionGraph: Restarting the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89).
    214.  
      19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RESTARTING to CREATED.
    215.  
      19/06/01 14:52:07 INFO CheckpointCoordinator: Restoring job a7188181ec45ab397d21bb1f928c7b89 from latest valid checkpoint: Checkpoint 3 @ 1559371921807 for a7188181ec45ab397d21bb1f928c7b89.
    216.  
      19/06/01 14:52:07 INFO CheckpointCoordinator: No master state to restore
    217.  
      19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state CREATED to RUNNING.
    218.  
      19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to SCHEDULED.
    219.  
      19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from SCHEDULED to DEPLOYING.
    220.  
      19/06/01 14:52:07 INFO ExecutionGraph: Deploying Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (attempt #33) to localhost
    221.  
      19/06/01 14:52:07 INFO TaskExecutor: Received task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1).
    222.  
      19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to DEPLOYING.
    223.  
      19/06/01 14:52:07 INFO Task: Creating FileSystem stream leak safety net for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING]
    224.  
      19/06/01 14:52:07 INFO Task: Loading JAR files for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].
    225.  
      19/06/01 14:52:07 INFO Task: Registering task at network: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].
    226.  
      19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.
    227.  
      19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.
    228.  
      19/06/01 14:52:07 INFO StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
    229.  
      19/06/01 14:52:07 INFO TwoPhaseCommitSinkFunction: MySqlTwoPhaseCommitSink 0/1 - restoring state
    230.  
      19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start commit...
    231.  
      19/06/01 14:52:07 ERROR DBConnectUtil: 提交事物失败,Connection:com.mysql.jdbc.JDBC4Connection@69ae3a8c
    232.  
      java.sql.SQLException: Unexpected exception encountered during query.
    233.  
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
    234.  
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
    235.  
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
    236.  
      at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
    237.  
      at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2523)
    238.  
      at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1547)
    239.  
      at com.zzy.bigdata.flink.streaming.DBConnectUtil.commit(DBConnectUtil.java:56)
    240.  
      at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:103)
    241.  
      at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:30)
    242.  
      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommit(TwoPhaseCommitSinkFunction.java:200)
    243.  
      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:395)
    244.  
      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:353)
    245.  
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    246.  
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    247.  
      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    248.  
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    249.  
      at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    250.  
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    251.  
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    252.  
      at java.lang.Thread.run(Thread.java:748)

    通过日志发现成功入库的日志是1-11,消费到数字15的时候,提交失败,日志最后一行发生了回滚,关闭了连接,然后进行conmit的时候也失败了,消费的数据12-15不会插入到数据库中,此时checkpoint也不会做了,checkpoint保存的还是上一次成功消费后的offset数据。

    数据库表:mysqlExactlyOnce_test

    1.  
      CREATE TABLE `mysqlExactlyOnce_test` (
    2.  
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
    3.  
      `value` varchar(255) DEFAULT NULL,
    4.  
      `insert_time` datetime DEFAULT NULL,
    5.  
      PRIMARY KEY (`id`)
    6.  
      ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    表中的数据

    用空常来坐坐 https://www.cnblogs.com/alexgl2008/
  • 相关阅读:
    Django + Uwsgi + Nginx 的概念
    ubantu+nginx+uwsgi+django部署
    FileZilla以root用户登录Linux
    全文检索django-haystack+jieba+whoosh
    七牛云上传视频
    JWT登录与多方式登录
    vue绑定用户页面
    绑定微博用户接口
    vue微博回调空页面
    微博回调接口
  • 原文地址:https://www.cnblogs.com/liuys635/p/12954917.html
Copyright © 2011-2022 走看看