zoukankan      html  css  js  c++  java
  • 笔记 很早东西日常的一些复制粘贴 怕忘了

    对于kafak与sparkstreaming集成后 存在的问题 
    一。基于receiver的方式在kafka1.0后好像是去取消了 都是高级api
    默认是200毫秒接受的数据形成一个block块,设置5s为一个批次 那就是5000/200 为25个分区
     
     1.val kafkaParams = Map(
          "zookeeper.connect" -> "bigdata.server1:2181",   //连接zookeeper的地址,获取和提交offet
          "group.id" ->"KafkaReceive",             //消费组的名称
          "zookeeper.connection.timeout.ms" -> "10000",
          "auto.offset.reset"-> "smallest"    //当前sparksreaing对应的消费者组第一次消费的时候方式,当前是从头消费
        )
        val lines: DStream[String] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
            ssc,
            kafkaParams,
            topics,
            StorageLevel.MEMORY_AND_DISK_SER_2
        ).map(_._2)
        
    2.  val topics = Map("test1" -> 4)
    
        val lines = KafkaUtils.createStream(
            ssc,
          "KafkaReceiverWC02",
          "bigdata.server1:2181",
            topics
        ).map(_._2)
        
    二。基于direct模式
    对应的是是topic有几个分区就有几个task
    对应的也是两种集成
    低级api可以定义从哪消费
        //由于Direct方式的kafka和Spark Streaming的集成方式中采用的api是低级封装的api(low lever api),消费的offset信息不需要zookeeper保存,而是直接去找broker节点
        val kafkaParams = Map(
          "metadata.broker.list"->"bigdata.server1:9092,bigdata.server1:9093,bigdata.server1:9094,bigdata.server1:9095"
        )
    
        //由于Direct方式的kafka和Spark Streaming的集成方式中采用的api是低级封装的api(low lever api),此时消费者的offet,由自己保管,不再是zookeeper,
        // 同时还可以自己指定从哪个offet开始消费 ,指定消费的topic以及对应每个分区,开始消费的offset
        val fromOffsets:Map[TopicAndPartition, Long] = Map(
          TopicAndPartition("bc",0) -> 0,
          TopicAndPartition("bc",1) -> 100,
          TopicAndPartition("bc",2) -> 200,
          TopicAndPartition("bc",3) -> 300
        )
    
        //MessageAndMetadata可以同时获取message的所属的topic,partiron,offset等元数据,也可以获取key和value,这里仅需要value
        val messageHandler: MessageAndMetadata[String, String] => String = (mmd:MessageAndMetadata[String, String])=>{
          //Messaged的Metadata
         // mmd.topic
         // mmd.partition
         // mmd.offset            在元数据区域获取到的偏移量与对应分区 进行存储
          //Messaged本身
          //mmd.key()
          mmd.message()
        }
    
        val lines: InputDStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,String](
          ssc,
          kafkaParams,
          fromOffsets,
          messageHandler
        )
        
        HasOffsetRanges是一个接口 kafkardd是他的子类 也是rdd的子类 所以使用foreachRdd都是rdd    
        class KafkaRDD[
        K: ClassTag,
        V: ClassTag,
        U <: Decoder[_]: ClassTag,
        T <: Decoder[_]: ClassTag,
        R: ClassTag] private[spark] (
        sc: SparkContext,
        kafkaParams: Map[String, String],
        val offsetRanges: Array[OffsetRange],
        leaders: Map[TopicAndPartition, (String, Int)],
        messageHandler: MessageAndMetadata[K, V] => R
      ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges
    
    
    
    
        DirectKafkaInputDStream是inputstream的子类
        DirectKafkaInputDStream.foreachRdd后都是{都是kafkaRdd}
        
     kafkaRDD.asInstanceOf[HasOffsetRanges]    
        
  • 相关阅读:
    【C++】资源管理
    【Shell脚本】逐行处理文本文件
    【算法题】rand5()产生rand7()
    【Shell脚本】字符串处理
    Apple iOS产品硬件参数. 不及格的程序员
    与iPhone的差距! 不及格的程序员
    iPhone游戏 Mr.Karoshi"过劳死"通关. 不及格的程序员
    XCode V4 发布了, 苹果的却是个变态. 不及格的程序员
    何时readonly 字段不是 readonly 的?结果出呼你想象!!! 不及格的程序员
    object file format unrecognized, invalid, or unsuitable Command 不及格的程序员
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10493428.html
Copyright © 2011-2022 走看看