zoukankan      html  css  js  c++  java
  • kafka学习知识点总结(四)

    代码调试过程中遇到的错误总结:

     KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
    
        )

    在代码编写的过程中,IDEA并没有自动识别方法Subscribe需要导入的jar包,但是该方法的jar已经有maven下载,在这个过程中,需要去查看jar类中的所有方法,手动去导入实现。

    import org.apache.spark.streaming.kafka010.ConsumerStrategies 中找到需要使用的方法Subscribe,然后对方法进行引用

    spark2.3+kafka0.11 direct模式 

    用代码实现对kafka数据的生产和消费 

    package com.bjsxt.scalaspark.streaming.streamingOnKafka
    
    import java.text.SimpleDateFormat
    import java.util.{Date, Properties}
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    
    import scala.util.Random
    
    /**
      * 向 kafka 中生产数据
      */
    object ProduceDataToKafka {
      def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
        val producer = new KafkaProducer[String,String](props)
        var counter = 0
        var keyFlag = 0
        while(true){
          counter +=1
          keyFlag +=1
          val content: String = userlogs()
          producer.send(new ProducerRecord[String, String]("mytopic", s"key-$keyFlag", content))
          if(0 == counter%100){
            counter = 0
            Thread.sleep(5000)
          }
        }
    
        producer.close()
      }
    
      def userlogs()={
        val userLogBuffer = new StringBuffer("")
        val timestamp = new Date().getTime();
        var userID = 0L
        var pageID = 0L
    
        //随机生成的用户ID
        userID = Random.nextInt(2000)
    
        //随机生成的页面ID
        pageID =  Random.nextInt(2000);
    
        //随机生成Channel
        val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
        val channel = channelNames(Random.nextInt(10))
    
        val actionNames = Array[String]("View", "Register")
        //随机生成action行为
        val action = actionNames(Random.nextInt(2))
    
        val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
        userLogBuffer.append(dateToday)
          .append("	")
          .append(timestamp)
          .append("	")
          .append(userID)
          .append("	")
          .append(pageID)
          .append("	")
          .append(channel)
          .append("	")
          .append(action)
        System.out.println(userLogBuffer.toString())
        userLogBuffer.toString()
      }
    
    
    }

    从kafka 消费数据,并实现手动设置offset

    package com.bjsxt.scalaspark.streaming.streamingOnKafka
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.{Durations, StreamingContext}
    
    /**
      * SparkStreaming2.3版本 读取kafka 中数据 :
      *  1.采用了新的消费者api实现,类似于1.6中SparkStreaming 读取 kafka Direct模式。并行度 一样。
      *  2.因为采用了新的消费者api实现,所有相对于1.6的Direct模式【simple api实现】 ,api使用上有很大差别。未来这种api有可能继续变化
      *  3.kafka中有两个参数:
      *      heartbeat.interval.ms:这个值代表 kafka集群与消费者之间的心跳间隔时间,kafka 集群确保消费者保持连接的心跳通信时间间隔。这个时间默认是3s.
      *             这个值必须设置的比session.timeout.ms 小,一般设置不大于 session.timeout.ms  的1/3
      *      session.timeout.ms :
      *             这个值代表消费者与kafka之间的session 会话超时时间,如果在这个时间内,kafka 没有接收到消费者的心跳【heartbeat.interval.ms 控制】,
      *             那么kafka将移除当前的消费者。这个时间默认是30s。
      *             这个时间位于配置 group.min.session.timeout.ms【6s】 和 group.max.session.timeout.ms【300s】之间的一个参数,如果SparkSteaming 批次间隔时间大于5分钟,
      *             也就是大于300s,那么就要相应的调大group.max.session.timeout.ms 这个值。
      *  4.大多数情况下,SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略,这种策略会将分区均匀的分布在集群的Executor之间。
      *    如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers 这种策略,那么当前这个Executor 中的数据会来自当前broker节点。
      *    如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略,可以通过一个map 指定将topic分区分布在哪些节点中。
      *
      *  5.新的消费者api 可以将kafka 中的消息预读取到缓存区中,默认大小为64k。默认缓存区在 Executor 中,加快处理数据速度。
      *     可以通过参数 spark.streaming.kafka.consumer.cache.maxCapacity 来增大,也可以通过spark.streaming.kafka.consumer.cache.enabled 设置成false 关闭缓存机制。
      *     "注意:官网中描述这里建议关闭,在读取kafka时如果开启会有重复读取同一个topic partition 消息的问题,报错:KafkaConsumer is not safe for multi-threaded access"
      *
      *  6.关于消费者offset
      *    1).如果设置了checkpoint ,那么offset 将会存储在checkpoint中。
      *     这种有缺点: 第一,当从checkpoint中恢复数据时,有可能造成重复的消费,需要我们写代码来保证数据的输出幂等。
      *                第二,当代码逻辑改变时,无法从checkpoint中来恢复offset.
      *    2).依靠kafka 来存储消费者offset,kafka 中有一个特殊的topic 来存储消费者offset。新的消费者api中,会定期自动提交offset。这种情况有可能也不是我们想要的,
      *       因为有可能消费者自动提交了offset,但是后期SparkStreaming 没有将接收来的数据及时处理保存。这里也就是为什么会在配置中将enable.auto.commit 设置成false的原因。
      *       这种消费模式也称最多消费一次,默认sparkStreaming 拉取到数据之后就可以更新offset,无论是否消费成功。自动提交offset的频率由参数auto.commit.interval.ms 决定,默认5s。
      *       *如果我们能保证完全处理完业务之后,可以后期异步的手动提交消费者offset. 注意:kafka集群重启后会清空所有消费者组保存的信息,这也是这种模式的弊端
      *
      *    3).自己存储offset,这样在处理逻辑时,保证数据处理的事务,如果处理数据失败,就不保存offset,处理数据成功则保存offset.这样可以做到精准的处理一次处理数据。
      *
      */
    object SparkStreamingOnKafkaDirect {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("SparkStreamingOnKafkaDirect")
        val ssc = new StreamingContext(conf,Durations.seconds(5))
        //设置日志级别
    //    ssc.sparkContext.setLogLevel("ERROR")
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "MyGroupId",//
    
          /**
            *
            *  earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始
            *  latest:自动重置偏移量为最大偏移量【默认】*
            *  none:没有找到以前的offset,抛出异常
            */
          "auto.offset.reset" -> "earliest",
    
          /**
            * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交
            */
          "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
        )
    
        val topics = Array[String]("t0218")
        val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,//消费策略
          Subscribe[String, String](topics, kafkaParams)
        )
    
        val transStrem: DStream[String] = stream.map(record => {
          val key_value = (record.key, record.value)
          println("receive message key = "+key_value._1)
          println("receive message value = "+key_value._2)
          key_value._2
        })
        val wordsDS: DStream[String] = transStrem.flatMap(line=>{line.split("	")})
        val result: DStream[(String, Int)] = wordsDS.map((_,1)).reduceByKey(_+_)
        result.print()
    
        /**
          * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset
          * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。
          */
        stream.foreachRDD { rdd =>
          val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          // some time later, after outputs have completed
          for(or <- offsetRanges){
            println(s"current topic = ${or.topic},partition = ${or.partition},fromoffset = ${or.fromOffset},untiloffset=${or.untilOffset}")
          }
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
      }
    }

    代码实现流程:

    1、启动zookeeper

    2、启动各个节点的kafka 

    3、启动代码开始生成数据,并将生成的数据传输到消息队列

    4、查看生成的数据: ./kafka-consumer-groups.sh --bootstrap-server mynode1:9092,mynode2:9092,mynode3:9092 --list  (将生成的offset存储在自己在kafka中创建的组MyGroupID中)

    5、查看数据组的结构: ./kafka-consumer-groups.sh --bootstrap-server mynode1:9092,mynode2:9092,mynode3:9092 --list

    查看消息队列连接是否成功:

    1、启动zookeeper

    2、启动所有节点的kafkla

    3、将启动完成以后的kafka

    4、在bin目录下启动生产数据的脚本:  ./kafka-console-producer.sh --broker-list mynode1:9092,mynode2:9092,mynode3:9092 --topic t
    5、在bin目录下启动查看消费数据的脚本:./kafka-console-consumer.sh --zookeeper mynode1:2181,mynode2:2181,mynode3:2181 --topic topic0703

    手动维护消费者offset:
    将sparkStreaming和kafka以及redis整合之后,sparkstreaming先去kafka中读取数据,读取完之后,对数据进行处理,处理完之后,将offset存储到redis中
    当将sparkstreaming停止之后,下一次重启的时候,先去redis中去查询,最新的offset ,查询过来之后设置给kafka

    代码演示过程:

    1、首先启动redis   service redisd start 

    package com.bjsxt.scalaspark.streaming.streamingOnKafka
    
    
    import java.util
    
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.streaming.{Durations, StreamingContext}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    import scala.collection.mutable
    
    /**
      * 利用redis 来维护消费者偏移量
      */
    object ManageOffsetUseRedis {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("manageoffsetuseredis")
        //设置每个分区每秒读取多少条数据
        conf.set("spark.streaming.kafka.maxRatePerPartition","10")
        val ssc = new StreamingContext(conf,Durations.seconds(5))
        //设置日志级别
        ssc.sparkContext.setLogLevel("Error")
    
        val topic = "mytopic"
        /**
          * 从Redis 中获取消费者offset
          */
        val dbIndex = 3
        val currentTopicOffset: mutable.Map[String, String] = getOffSetFromRedis(dbIndex,topic)
        //初始读取到的topic offset:
        currentTopicOffset.foreach(x=>{println(s" 初始读取到的offset: $x")})
    
        //转换成需要的类型
        val fromOffsets: Predef.Map[TopicPartition, Long] = currentTopicOffset.map { resultSet =>
          new TopicPartition(topic, resultSet._1.toInt) -> resultSet._2.toLong
        }.toMap
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "MyGroupId-1",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
        )
        /**
          * 将获取到的消费者offset 传递给SparkStreaming
          */
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
        )
    
        stream.foreachRDD { rdd =>
    
          println("**** 业务处理完成  ****")
    
          val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
          rdd.foreachPartition { iter =>
            val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            println(s"topic:${o.topic}  partition:${o.partition}  fromOffset:${o.fromOffset}  untilOffset: ${o.untilOffset}")
          }
    
          //将当前批次最后的所有分区offsets 保存到 Redis中
          saveOffsetToRedis(dbIndex,offsetRanges)
        }
    
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    
    
      }
    
      /**
        * 将消费者offset 保存到 Redis中
        *
        */
      def saveOffsetToRedis(db:Int,offsetRanges:Array[OffsetRange]) = {
        val jedis = RedisClient.pool.getResource
        jedis.select(db)
        offsetRanges.foreach(one=>{
          jedis.hset(one.topic, one.partition.toString,one.untilOffset.toString)
        })
        RedisClient.pool.returnResource(jedis)
      }
    
    
      /**
        * 从Redis中获取保存的消费者offset
        * @param db
        * @param topic
        * @return
        */
      def getOffSetFromRedis(db:Int,topic:String)  ={
        val jedis = RedisClient.pool.getResource
        jedis.select(db)
        val result: util.Map[String, String] = jedis.hgetAll(topic)
        RedisClient.pool.returnResource(jedis)
        if(result.size()==0){
          result.put("0","0")
          result.put("1","0")
          result.put("2","0")
        }
        import scala.collection.JavaConversions.mapAsScalaMap
        val offsetMap: scala.collection.mutable.Map[String, String] = result
        offsetMap
      }
    }



     

  • 相关阅读:
    这仅仅是一份工作
    和老总之间的对话
    假设满足怎样的条件,就不去编程
    那都是别人的架构
    程序员狂想曲
    学点经济学知识(三)
    一起来看 HTML 5.2 中新的原生元素 <dialog>
    动态配置页面 之 组件系统
    初识JavaScript EventLoop
    webpack+vue-cli+ElementUI+vue-resource 前端开发
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/11117888.html
Copyright © 2011-2022 走看看