zoukankan      html  css  js  c++  java
  • Spark2.3整合kafka010手动管理offset

    代码示例:

    package cn.com.kong.streaming;
    
    
    import kafka.utils.ZkUtils;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.ZkConnection;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.*;
    import scala.Tuple2;
    import scala.collection.JavaConverters;
    import scala.collection.Seq;
    
    import java.io.IOException;
    import java.util.*;
    
    /**
     * create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}
     */
    public class KafkaOffsetToHBase {
    
        public static void main(String[] args) throws InterruptedException, IOException {
    
    //        if (args.length < 6) {
    //            System.err.println("Usage: KafkaOffsetToHBase <batch-duration-in-seconds> <kafka-bootstrap-servers> " +
    //                    "<kafka-topics> <kafka-consumer-group-id> <hbase-table-name> <kafka-zookeeper-quorum>");
    //            System.exit(1);
    //        }
            long batchDuration = 10L;
            String bootstrapServers = "172.30.xx.xx:9092,172.30.x.xx:9092";
            String consumerGroupID = "group01";
            String topic = "topic02";
            String hbaseTableName = "stream_kafka_offsets";
            String zkQuorum = "172.30.x.xx:2181,172.30.x.xx:2181,172.30.x.xxx:2181";
            String zkKafkaRootDir = "brokers";
            int zkSessionTimeOut = 10000;
            int zkConnectionTimeOut = 10000;
    
            HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topic.split(",")));
            // kafka相关配置参数
            HashMap<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
            kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,consumerGroupID);
            kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
            kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            SparkConf sparkConf = new SparkConf().setAppName("KafkaOffsetToHBase").setMaster("local[3]")
                    .set("spark.io.compression.codec","snappy");
    
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
            jssc.sparkContext().setLogLevel("WARN");
    
            Map<TopicPartition, Long> fromOffsetsMap = getLastCommittedOffsets(topic, consumerGroupID, hbaseTableName, zkQuorum, zkKafkaRootDir,
                    zkSessionTimeOut, zkConnectionTimeOut);
    
            JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent()
                    , ConsumerStrategies.Subscribe(topicsSet, kafkaParams, fromOffsetsMap));
    
            inputDStream.foreachRDD((javaRDD, time) -> {
                if(!javaRDD.isEmpty()){
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) javaRDD.rdd()).offsetRanges();
                    for (OffsetRange offset : offsetRanges) {
                        System.out.println("topic:"+offset.topic()+",partition:"+offset.partition()+",fromOffset:"
                                +offset.fromOffset()+",untilOffset:"+offset.untilOffset());
                    }
                    //处理逻辑
                    JavaRDD<String> newRDD = javaRDD.map(record -> record.value());
                    long count = newRDD.count();
    
                    //保存offset到hbase
                    //这里需要注意:计算结果如果与offset分开存储,可能会出现计算结果保存成功了,但是offset保存失败了
                    //那么下次启动的时候,会有部分数据重复消费,所以需要做好幂等
                    saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,time);
                    System.out.println("已经处理的消息条数: " + count);
                }
            });
    
            jssc.start();
            jssc.awaitTermination();
    
        }
    
    
        /**
         * 保存offset
         * @param topic_name
         * @param group_id
         * @param offsetRanges
         * @param hbaseTableName
         * @param batchTime
         */
        private static void saveOffsets(String topic_name, String group_id, OffsetRange[] offsetRanges
                , String hbaseTableName, Time batchTime) throws IOException {
    
            Configuration hbaseConf = HBaseConfiguration.create();
            hbaseConf.set("hbase.zookeeper.property.clientPort","2181");
            hbaseConf.set("hbase.zookeeper.quorum","172.30.x.xx,172.30.x.xxx,172.30.x.xx");
            Connection connection = ConnectionFactory.createConnection(hbaseConf);
            Table table = connection.getTable(TableName.valueOf(hbaseTableName));
            String rowKey = topic_name+":"+group_id+":"+batchTime.milliseconds();
            Put put = new Put(rowKey.getBytes());
    
            for (OffsetRange offset : offsetRanges) {
                put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(String.valueOf(offset.partition()))
                        ,Bytes.toBytes(String.valueOf(offset.untilOffset())));
            }
            table.put(put);
            connection.close();
        }
    
        /**
         *
         * @param topic_name
         * @param group_id
         * @param hbaseTableName
         * @param zkQuorum
         * @param zkRootDir
         * @param sessionTimeOut
         * @param connectionTimeOut
         * @return
         * @throws IOException
         */
        private static Map<TopicPartition, Long> getLastCommittedOffsets(String topic_name, String group_id
                , String hbaseTableName, String zkQuorum, String zkRootDir, int sessionTimeOut
                , int connectionTimeOut) throws IOException {
            Configuration hbaseConf = HBaseConfiguration.create();
            hbaseConf.set("hbase.zookeeper.property.clientPort","2181");
            hbaseConf.set("hbase.zookeeper.quorum","172.30.x.xx,172.30.x.xx,172.30.x.xx");
    
            String zkUrl = zkQuorum + "/" + zkRootDir;
            Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl
                    , sessionTimeOut, connectionTimeOut);
    
            ZkUtils zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false);
            Seq<String> seq_topic = JavaConverters.asScalaIteratorConverter(Collections.singletonList(topic_name)
                    .iterator()).asScala().toSeq();
            int zKNumberOfPartitionsForTopic  = zkUtils.getPartitionsForTopics(seq_topic).get(topic_name)
                    .toList().head().size();
    
            //Connect to HBase to retrieve last committed offsets
            Connection conn = ConnectionFactory.createConnection(hbaseConf);
            Table table = conn.getTable(TableName.valueOf(hbaseTableName));
            String startRow = topic_name + ":" + group_id + ":" + System.currentTimeMillis();
            String stopRow = topic_name + ":" + group_id + ":" + 0;
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan.setStartRow(startRow.getBytes()).setStopRow(
                    stopRow.getBytes()).setReversed(true));
    
            Result result = scanner.next();
            //Set the number of partitions discovered for a topic in HBase to 0
            int hbaseNumberOfPartitionsForTopic = 0;
            if (result != null){
                //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
                hbaseNumberOfPartitionsForTopic = result.listCells().size();
            }
    
    
            Map<TopicPartition, Long> fromOffsets = new HashMap<>();
            if(hbaseNumberOfPartitionsForTopic == 0){
                // initialize fromOffsets to beginning
                for (int partition = 0; partition < zKNumberOfPartitionsForTopic -1; partition++) {
                    fromOffsets.put(new TopicPartition(topic_name,partition),0L);
                }
            }else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
                // handle scenario where new partitions have been added to existing kafka topic
                //如果该topic的分区执行了新增,也就是zk里记录的目前topic分区数大于在hbase保存的分区数
                for (int partition = 0; partition < hbaseNumberOfPartitionsForTopic -1; partition++) {
                    //找出每个分区在hbase里记录的offset
                    String offset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(
                            String.valueOf(partition))));
                    fromOffsets.put(new TopicPartition(topic_name,partition),Long.valueOf(offset));
                }
                for (int partition = hbaseNumberOfPartitionsForTopic; partition < zKNumberOfPartitionsForTopic-1; partition++) {
                    fromOffsets.put(new TopicPartition(topic_name,partition),0L);
                }
    
            }else {
                //initialize fromOffsets from last run
                //没有新增分区,在应用重启以后,从hbase里读取上次记录的offset值
                for (int partition = 0; partition < hbaseNumberOfPartitionsForTopic; partition++) {
                    String offset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(
                            String.valueOf(partition))));
                    fromOffsets.put(new TopicPartition(topic_name,partition),Long.valueOf(offset));
                    System.out.println("组装fromOffset接着上次处理--topic_name:"+topic_name+",partition:"+partition+",fromOffset:"+offset);
                }
            }
            scanner.close();
            conn.close();
            return fromOffsets;
        }
    
    }

    官网:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

  • 相关阅读:
    大整数取模
    HDU 4527 小明系列故事——玩转十滴水
    HDU 3293
    Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
    Windows anaconda 运行yolov3
    AttributeError: module 'tensorflow.python.training.checkpointable' has no attribute 'CheckpointableBase'
    40行代码的人脸识别实践——RuntimeError: Error while calling cudnnCreate
    tensorboard 监控指标可视化
    tensorflow实战Google深度学习框架 第292页的程序 完整版 以及计算图可视化
    tensroboard 结合命名控件,同一命名空间的节点会折叠成一个节点
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12521972.html
Copyright © 2011-2022 走看看