zoukankan      html  css  js  c++  java
  • kafka 的 createDirectStream

    kafka api中给出2类直接获取流的接口:createStream和createDirectStream。

    createStream比较简单,只需topic、groupid、zookeeper就可以直接获取流,brokers和offset都是黑盒无需进行控制,但在项目中往往不受控。以下是部分源码:

    /**
       * Create an input stream that pulls messages from Kafka Brokers.
       * @param ssc       StreamingContext object
       * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
       * @param groupId   The group id for this consumer
       * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
       *                  in its own thread
       * @param storageLevel  Storage level to use for storing the received objects
       *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
       * @return DStream of (Kafka message key, Kafka message value)
       */
      def createStream(
          ssc: StreamingContext,
          zkQuorum: String,
          groupId: String,
          topics: Map[String, Int],
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
        ): ReceiverInputDStream[(String, String)] = {
        val kafkaParams = Map[String, String](
          "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
          "zookeeper.connection.timeout.ms" -> "10000")
        createStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topics, storageLevel)
      }
    KafkaUtils.createStream

    createDirectStream直接去操作kafka,需要自己手动保存offset,方法的注释写的还是很明白的,以下是部分源码:

    /**
       * Create an input stream that directly pulls messages from Kafka Brokers
       * without using any receiver. This stream can guarantee that each message
       * from Kafka is included in transformations exactly once (see points below).
       *
       * Points to note:
       *  - No receivers: This stream does not use any receiver. It directly queries Kafka
       *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
       *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
       *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
       *    You can access the offsets used in each batch from the generated RDDs (see
       *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
       *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
       *    in the [[StreamingContext]]. The information on consumed offset can be
       *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
       *  - End-to-end semantics: This stream ensures that every records is effectively received and
       *    transformed exactly once, but gives no guarantees on whether the transformed data are
       *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
       *    that the output operation is idempotent, or use transactions to output records atomically.
       *    See the programming guide for more details.
       *
       * @param ssc StreamingContext object
       * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
       *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
       *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
       *    host1:port1,host2:port2 form.
       * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
       *    starting point of the stream
       * @param messageHandler Function for translating each message and metadata into the desired type
       * @tparam K type of Kafka message key
       * @tparam V type of Kafka message value
       * @tparam KD type of Kafka message key decoder
       * @tparam VD type of Kafka message value decoder
       * @tparam R type returned by messageHandler
       * @return DStream of R
       */
      def createDirectStream[
        K: ClassTag,
        V: ClassTag,
        KD <: Decoder[K]: ClassTag,
        VD <: Decoder[V]: ClassTag,
        R: ClassTag] (
          ssc: StreamingContext,
          kafkaParams: Map[String, String],
          fromOffsets: Map[TopicAndPartition, Long],
          messageHandler: MessageAndMetadata[K, V] => R
      ): InputDStream[R] = {
        val cleanedHandler = ssc.sc.clean(messageHandler)
        new DirectKafkaInputDStream[K, V, KD, VD, R](
          ssc, kafkaParams, fromOffsets, cleanedHandler)
      }
    KafkaUtils.createDirectStream

    /**
       * Create an input stream that directly pulls messages from Kafka Brokers
       * without using any receiver. This stream can guarantee that each message
       * from Kafka is included in transformations exactly once (see points below).
       *
       * Points to note:
       *  - No receivers: This stream does not use any receiver. It directly queries Kafka
       *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
       *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
       *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
       *    You can access the offsets used in each batch from the generated RDDs (see
       *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
       *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
       *    in the [[StreamingContext]]. The information on consumed offset can be
       *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
       *  - End-to-end semantics: This stream ensures that every records is effectively received and
       *    transformed exactly once, but gives no guarantees on whether the transformed data are
       *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
       *    that the output operation is idempotent, or use transactions to output records atomically.
       *    See the programming guide for more details.
       *
       * @param ssc StreamingContext object
       * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
       *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
       *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
       *   host1:port1,host2:port2 form.
       *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
       *   to determine where the stream starts (defaults to "largest")
       * @param topics Names of the topics to consume
       * @tparam K type of Kafka message key
       * @tparam V type of Kafka message value
       * @tparam KD type of Kafka message key decoder
       * @tparam VD type of Kafka message value decoder
       * @return DStream of (Kafka message key, Kafka message value)
       */
      def createDirectStream[
        K: ClassTag,
        V: ClassTag,
        KD <: Decoder[K]: ClassTag,
        VD <: Decoder[V]: ClassTag] (
          ssc: StreamingContext,
          kafkaParams: Map[String, String],
          topics: Set[String]
      ): InputDStream[(K, V)] = {
        val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
        val kc = new KafkaCluster(kafkaParams)
        val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
        new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
          ssc, kafkaParams, fromOffsets, messageHandler)
      }
    KafkaUtils.createDirectStream

    项目中需要的是手动去控制这个偏移量,由此可以看到多了2个参数:fromOffsets: Map[TopicAndPartition, Long] 和 messageHandler: MessageAndMetadata[K, V] => R。

    获取fromOffsets的思路应该就是:

    1. 连接到zk

    2. 获取topic和partitions

    3. 遍历topic的partitions,读取每个partitions的offset(存在zk中的地址为:/consumers/[group id]/offsets/[topic]/[0 ... N])

    4. 有可能读取的路径为空,那么得去取leader中的offset

    因此,对应代码:(可以参考这些源码:kafka.utils.ZkUtils,org.apache.spark.streaming.kafka.KafkaUtils,kafka.tools.GetOffsetShell,及其对应的调用类)

    private def getOffset = {
        val fromOffset: mutable.Map[TopicAndPartition, Long] = mutable.Map()
    
        val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(kafkaZkQuorum, kafkaZkSessionTimeout, kafkaZkSessionTimeout)
        val zkUtil = new ZkUtils(zkClient, zkConnection, false)
    
        zkUtil.getPartitionsForTopics(kafkaTopic.split(",").toSeq)
          .foreach({ topic2Partition =>
            val topic = topic2Partition._1
            val partitions = topic2Partition._2
            val topicDirs = new ZKGroupTopicDirs(groupId, topic)
    
            partitions.foreach(partition => {
              val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
              zkUtil.makeSurePersistentPathExists(zkPath)
    
              val untilOffset = zkUtil.zkClient.readData[String](zkPath)
    
              val tp = TopicAndPartition(topic, partition)
              val offset = {
                if (null == untilOffset)
                  getLatestLeaderOffsets(tp, zkUtil)
                else untilOffset.toLong
              }
              fromOffset += (tp -> offset)
            }
            )
          })
        zkUtil.close()
        fromOffset.toMap
      }
    getOffset

    获取messageHandler,就跟其第二个构造函数一样即可:

    messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    messageHandler

    接着就是getLatestLeaderOffsets:

    private def getLatestLeaderOffsets(tp: TopicAndPartition, zkUtil: ZkUtils): Long = {
        try {
          val brokerId = zkUtil.getLeaderForPartition(tp.topic, tp.partition).get
          val brokerInfoString = zkUtil.readDataMaybeNull(s"${ZkUtils.BrokerIdsPath}/$brokerId")._1.get
          val brokerInfo = Json.parseFull(brokerInfoString).get.asInstanceOf[Map[String, Any]]
    
          val host = brokerInfo("host").asInstanceOf[String]
          val port = brokerInfo("port").asInstanceOf[Int]
    
          val consumer = new SimpleConsumer(host, port, 10000, 100000, "getLatestLeaderOffsets")
          val request = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
          val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(tp).offsets
    
          offsets.head
        } catch {
          case _ => throw new Exception("获取最新offset异常:" + TopicAndPartition)
        }
      }
    getLatestLeaderOffsets

     最后就是调用的方式了:

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc,
          kafkaParams, getOffset, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))
    KafkaUtils.createDirectStream

    由于要从灾难中还原,做到7*24,需要设置checkpoint,业务逻辑需要包含在checkpoint的方法里,代码如下:

    def main(args: Array[String]): Unit = {
    
        val run = gatewayIsEnable || urlAnalysIsEnable
    
        if (run) {
    
          val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
    
          ssc.start()
          ssc.awaitTermination()
        }
      }
    
      def createStreamingContext() = {
        val duration = SysConfig.duration(2)
    
        val sparkConf = new SparkConf().setAppName("cmhi")
        val ssc = new StreamingContext(sparkConf, Seconds(duration))
    
        ssc.checkpoint(checkpointDir)
    
        Osgi.init(ssc, debug)
        
        ssc
      }
    main

    作者: zhangQ
    个人主页:https://www.yxzqy.com/
    本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    [JZOJ3339]【NOI2013模拟】wyl8899和法法塔的游戏
    [JZOJ3337] 【NOI2013模拟】wyl8899的TLE
    UVA 1262 Password
    UVA 10820 Send a Table
    UVA 12716 GCD XOR
    UVA 10791
    UVA 10375 choose and divide (唯一分解定理)
    欧拉函数
    51 Nod 1069 Nim游戏
    51 Nod 1242 矩阵快速幂求斐波那契数列
  • 原文地址:https://www.cnblogs.com/zqyx/p/7688837.html
Copyright © 2011-2022 走看看