zoukankan      html  css  js  c++  java
  • flink04 -----1 kafkaSource 2. kafkaSource的偏移量的存储位置 3 将kafka中的数据写入redis中去 4 将kafka中的数据写入mysql中去

    1. kafkaSource

      见官方文档

    2. kafkaSource的偏移量的存储位置

      默认存在kafka的特殊topic中,但也可以设置参数让其不存在kafka的特殊topic中

      

      3   将kafka中的数据写入redis中去

      redisSink不支持exactly Once,只支持AtLeast Once

    KafkaSourceToRedisDemo

      1 package cn._51doit.flink.day04;
      2 
      3 import org.apache.flink.api.common.functions.FlatMapFunction;
      4 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
      5 import org.apache.flink.api.common.serialization.SimpleStringSchema;
      6 import org.apache.flink.api.java.tuple.Tuple;
      7 import org.apache.flink.api.java.tuple.Tuple2;
      8 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
      9 import org.apache.flink.streaming.api.CheckpointingMode;
     10 import org.apache.flink.streaming.api.datastream.DataStreamSource;
     11 import org.apache.flink.streaming.api.datastream.KeyedStream;
     12 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
     13 import org.apache.flink.streaming.api.environment.CheckpointConfig;
     14 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     15 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
     16 import org.apache.flink.streaming.connectors.redis.RedisSink;
     17 import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
     18 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
     19 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
     20 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
     21 import org.apache.flink.util.Collector;
     22 
     23 import java.util.Properties;
     24 
     25 //运行该程序要传入5个参数:ckdir gid topic redishost redisport
     26 public class KafkaSourceToRedisDemo {
     27 
     28     public static void main(String[] args) throws Exception{
     29 
     30         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     31 
     32         //如果开启Checkpoint,偏移量会存储到哪呢?
     33         env.enableCheckpointing(30000);
     34         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
     35         //就是将job cancel后,依然保存对应的checkpoint数据
     36         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     37         env.setStateBackend(new FsStateBackend(args[0]));
     38         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 30000));
     39 
     40         Properties properties = new Properties();
     41         properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092");
     42         properties.setProperty("group.id", args[1]);
     43         properties.setProperty("auto.offset.reset", "earliest");
     44         //properties.setProperty("enable.auto.commit", "false");
     45         //如果没有开启checkpoint功能,为了不重复读取数据,FlinkKafkaConsumer会将偏移量保存到了Kafka特殊的topic中(__consumer_offsets)
     46         //这种方式没法实现Exactly-Once
     47         FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(args[2], new SimpleStringSchema(), properties);
     48 
     49         //在Checkpoint的时候将Kafka的偏移量保存到Kafka特殊的Topic中,默认是true
     50         flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
     51 
     52         DataStreamSource<String> lines = env.addSource(flinkKafkaConsumer);
     53 
     54         SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
     55             @Override
     56             public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
     57                 String[] words = line.split(" ");
     58                 for (String word : words) {
     59                     out.collect(Tuple2.of(word, 1));
     60                 }
     61             }
     62         });
     63 
     64         KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
     65 
     66         SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
     67         //Transformation 结束
     68         //调用RedisSink将计算好的结果保存到Redis中
     69 
     70         //创建Jedis连接的配置信息
     71         FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
     72                 .setHost(args[3])
     73                 .setPassword(args[4])
     74                 .build();
     75 
     76         summed.addSink(new RedisSink<>(conf, new RedisWordCountMapper()));
     77 
     78         env.execute("KafkaSourceDemo");
     79 
     80     }
     81 
     82 
     83     public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
     84 
     85         @Override
     86         public RedisCommandDescription getCommandDescription() {
     87             //指定写入Redis中的方法和最外面的大key的名称
     88             return new RedisCommandDescription(RedisCommand.HSET, "wc");
     89         }
     90 
     91         @Override
     92         public String getKeyFromData(Tuple2<String, Integer> data) {
     93             return data.f0; //将数据中的哪个字段作为key写入
     94         }
     95 
     96         @Override
     97         public String getValueFromData(Tuple2<String, Integer> data) {
     98             return data.f1.toString(); //将数据中的哪个字段作为value写入
     99         }
    100     }
    101 }
    View Code

    注意,在任务取消后,checkpoint中的数据会被删除掉,为了不让checkpoint中的数据被删除,可以设置如下参数

    //就是将job cancel后,依然保存对应的checkpoint数据
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    验证发现,解释redis使用的是At Least Once ,基于redis的幂等性(覆盖),其也能达到exactly once的目的

     因此At Least Once结合redis的幂等性。可以实现exactly once的功能

    问题:在checkpoint时,Flink怎么保证operator state和keyed state是一致的?

      Flink为了在checkpoint时,实现数据一致性时,其会将source阻断(barrier机制),相当于将source节流(barrier),并且下游所有算子计算完才进行checkpoint,这样就能保证数据一致

    4 将kafka中的数据写入mysql中去

    KafkaSourceToMySQLDemo   

    package cn._51doit.flink.day04;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    import org.apache.flink.util.Collector;
    
    import java.util.Properties;
    
    //运行该程序要传入5个参数:ckdir gid topic redishost redisport
    public class KafkaSourceToMySQLDemo {
    
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //如果开启Checkpoint,偏移量会存储到哪呢?
            env.enableCheckpointing(30000);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
            //就是将job cancel后,依然保存对应的checkpoint数据
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            env.setStateBackend(new FsStateBackend(args[0]));
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 30000));
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092");
            properties.setProperty("group.id", args[1]);
            properties.setProperty("auto.offset.reset", "earliest");
            //properties.setProperty("enable.auto.commit", "false");
            //如果没有开启checkpoint功能,为了不重复读取数据,FlinkKafkaConsumer会将偏移量保存到了Kafka特殊的topic中(__consumer_offsets)
            //这种方式没法实现Exactly-Once
            FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(args[2], new SimpleStringSchema(), properties);
    
            //在Checkpoint的时候将Kafka的偏移量保存到Kafka特殊的Topic中,默认是true
            flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
    
            DataStreamSource<String> lines = env.addSource(flinkKafkaConsumer);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
            //Transformation 结束
            //调用MySQLSink将计算好的结果保存到MySQL中
            summed.addSink(new MySqlSink());
    
            env.execute("KafkaSourceToMySQLDemo");
    
        }
    
    
    
    }
    View Code

    MySqlSink

    package cn._51doit.flink.day04;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    public class MySqlSink extends RichSinkFunction<Tuple2<String, Integer>> {
    
        private Connection connection = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            //可以创建数据库连接
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456");
    
        }
    
    
        @Override
        public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
    
            PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO t_wordcount VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?");
            preparedStatement.setString(1, value.f0);
            preparedStatement.setLong(2, value.f1);
            preparedStatement.setLong(3, value.f1);
            preparedStatement.executeUpdate();
            preparedStatement.close();
        }
    
        @Override
        public void close() throws Exception {
            connection.close();
        }
    
    
    }
    View Code
  • 相关阅读:
    C++ Call C# COM
    C ++ / CLI 语法
    C#调用C++ DLL动态库的两种方式
    Linux下的JMeter部署及使用
    同时处理多请求(带超时时间)
    同时处理多请求
    curl命令
    mysql命令
    SpringBoot配置文件敏感信息加密-jasypt
    spring boot升级到2.x的坑
  • 原文地址:https://www.cnblogs.com/jj1106/p/13166695.html
Copyright © 2011-2022 走看看