zoukankan      html  css  js  c++  java
  • Spark Streaming与Kafka集成

    Spark Streaming与Kafka集成

    1、介绍

    kafka是一个发布订阅消息系统,具有分布式、分区化、多副本提交日志特点。kafka项目在0.8和0.10之间引入了一种新型消费者API,注意选择正确的包以获得相应的特性。每个版本都是向后兼容的,因此0.8可以兼容0.9和0.10,但是0.10不能兼容早期版本。0.8支持python、Receiver流和Direct流,不支持偏移量提交API以及动态分区订阅,0.10不支持python和Receiver流,支持Direct流、偏移量提交API和动态分区订阅。具体见表格:

    spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
    Broker Version 0.8.2.1 or higher 0.10.0 or higher
    API Maturity 过时 稳定
    支持语言 scala、java、python scala、java
    Receiver流 支持 不支持
    Direct流 支持 支持
    SSL/TLS 不支持 支持
    偏移量提交API 不支持 支持
    动态分区订阅 不支持 支持

    2、Spark Streaming与kafka集成

    0.10的集成方式和0.8类似,提供了在spark streaming 分区和kafka分区的1:1关系,可以访问偏移量和元数据。但由于使用的是新型消费者API,而不是简单API,因此还是有诸多注意事项。

    2.1 创建模块引入依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    

    注意:不要手动添加org.apache.kafka工件依赖,该依赖已经有正确的工件依赖,多个不同版本会导致不兼容。

    2.2 实现scala代码

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    /**
     * Created by Administrator on 2018/3/8.
     */
    object SparkStreamingKafkaScala {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("kafka")
        conf.setMaster("local[*]")
    
        val ssc = new StreamingContext(conf , Seconds(2))
    
        //kafka参数
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "s102:9092,s103:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("topic1")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        val ds2 = stream.map(record => (record.key, record.value))
        ds2.print()
    
        ssc.start()
    
        ssc.awaitTermination()
      }
    }
    

    2.3 启动kafka集群

    $>xkafka.sh start
    

    2.4 创建主题

    创建主题,指定分区数和副本数,分区数和集群的内核数相同,保证最大并发能力,例如有三个节点,每个节点8个和,分区数为3 * 8 = 24,。

    $>kafka-topics.sh --zookeeper s102:2181 --create --topic topic1 
    	--replication-factor 3 --partitions 8
    

    2.5 启动控制台消费者

    $>kafka-console-consumer.sh --zookeeper s102:2181 --topic topic1
    

    2.6 启动控制台生产者

    $>kafka-console-producer.sh --broker-list s102:9092 --topic topic1
    

    2.7 在生产者终端输入消息,检查消费者是否能够收到

    spark_037

    2.8 启动Spark streaming应用

    spark_038

    2.9 查看流的分区

    //通过RDD查看分区数 , 分区数为4
    stream.foreachRDD(rdd=>{
    	println(rdd.partitions.length)
    })
    

    2.10 查看kafka主题的分区

    $>kafka-topics.sh --zookeeper s102:2181 --topic topic1 --describe
    

    运行结果如下:

    spark_039

    3、相关参数

    3.1 LocationStrategies

    新型kafka消费者API会预提取kafka数据到buffer中,因此让Spark在executor上保持缓存的consumer,对于性能来讲就非常重要,而不是每个batch创建新的consumer,选择在执行器上对于给定的主题分区如何调度消费者。位置策略的本意就是控制消费者在哪些节点上开启。

    • LocationStrategies.PreferConsistent

      大多数情况下,选择使用该方式,将在所有executors上均衡分布分区进行调度。

    • LocationStrategies.PreferBrokers

      如果executor和kafka broker位于同一主机,则可以使用该方式,这将优先调度那些分区为leader的分区。

    • LocationStrategies.PreferFixed

      如果在分区间有严重的数据倾斜,可以使用该方式,允许为分区指定特定的位置进行调度。

    3.2 ConsumerStrategies

    新型kafka消费者API有几种指定主题的方式。

    • ConsumerStrategies.Assign

      允许指定固定的分区集合。

    • ConsumerStrategies.Subscribe

      允许订阅固定的主题集合。

    • ConsumerStrategies.SubscribePattern

      可以使用正则表达式指定主题。

    4、使用PreferFixed和Assign组合

    指定s102消费主题的所有分区,每个分区下消费特定的偏移量。

    import java.net.Socket
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable.ArrayBuffer
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    /**
      * Created by Administrator on 2018/3/8.
      */
    object SparkStreamingKafkaScala {
    
      //发送消息给远程socket
      def sendInfo(msg: String, objStr: String) = {
        //获取ip
        val ip = java.net.InetAddress.getLocalHost.getHostAddress
        //得到pid
        val rr = java.lang.management.ManagementFactory.getRuntimeMXBean();
        val pid = rr.getName().split("@")(0);
        //pid
        //线程
        val tname = Thread.currentThread().getName
        //对象id
        val sock = new java.net.Socket("s101", 8888)
        val out = sock.getOutputStream
        val m = ip + "	:" + pid + "	:" + tname + "	:" + msg + "	:" + objStr + "
    "
        out.write(m.getBytes)
        out.flush()
        out.close()
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("kafka")
        //conf.setMaster("spark://s101:7077")
        conf.setMaster("local[8]")
    
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //kafka参数
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "s102:9092,s103:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
    	//主题分区与主机名的映射,那个主题分区由哪台主机消费
        val map = scala.collection.mutable.Map[TopicPartition,String]()
    	/**************************************************
    	 ***********   一定要使用ip地址 !!!!*****************
    	 **************************************************/
        map.put(new TopicPartition("t1" , 0) , "192.168.231.102")
        map.put(new TopicPartition("t1" , 1) , "192.168.231.102")
        map.put(new TopicPartition("t1" , 2) , "192.168.231.102")
        map.put(new TopicPartition("t1" , 3) , "192.168.231.102")
        val fix = LocationStrategies.PreferFixed(map) ;
    
        val topics = Array("t1")
    
        //主题分区集合
        val tps = scala.collection.mutable.ArrayBuffer[TopicPartition]()
        tps.+=(new TopicPartition("t1" , 0))
        tps.+=(new TopicPartition("t1" , 1))
        tps.+=(new TopicPartition("t1" , 2))
    
        //偏移量集合
        val offsets = scala.collection.mutable.Map[TopicPartition,Long]()
        offsets.put(new TopicPartition("t1", 0), 3)
        offsets.put(new TopicPartition("t1", 1), 3)
        offsets.put(new TopicPartition("t1", 2), 0)
    	
        //消费者策略,主题分区与偏移集合
        val conss = ConsumerStrategies.Assign[String,String](tps , kafkaParams , offsets)
    
        //创建kakfa直向流
        val stream = KafkaUtils.createDirectStream[String,String](
          ssc,
          fix,
          ConsumerStrategies.Assign[String, String](tps, kafkaParams, offsets)
        )
    
        val ds2 = stream.map(record => {
          val t = Thread.currentThread().getName
          val key = record.key()
          val value = record.value()
          val offset = record.offset()
          val par = record.partition()
          val topic = record.topic()
          val tt = (key , value ,offset, par,topic ,t)
          sendInfo(tt.toString() ,this.toString)
          tt
        })
    
        ds2.print()
    
        ssc.start()
    
        ssc.awaitTermination()
      }
    }
    
    

    5、手动提交偏移量

    将rdd强制转换成CanCommitOffsets,通过该trait进行提交,且只能异步提交,可以指定回调函数对提交结果进行处理。

    val stream = KafkaUtils.createDirectStream(...)
    //...
    //在driver端提交,因为是DStream的方法,DStream不能串行化。
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() {
      //回调函数
      def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
        if (null != e) {
          // error
        } else {
          // success
        }
      }
    })
    

    6、消费语义

    • 最多一次

      at most ,最多消费一次。先提交,后消费。

      //先提交
      commitAsync()
      //后消费
      consume()
      

    • 最少一次

      //先消费
      consume()
      //后提交
      commitAsync()
      
    • 精准一次

      //提交偏移量到数据库,不在kafka中
      conn.setAutoCommit();
      consume()
      updateOffset()
      conn.commit();
      

  • 相关阅读:
    python 的时间复杂度
    python之进制转换
    进程、线程、协程
    [GO]gtk的windows环境搭建
    [GO]并的爬取捧腹的段子
    [GO]并发的网络爬虫
    [GO]百度贴吧的爬虫
    [operator]jenkins+gitlab/Webhook自动构建发布
    [GO]并发实现聊天室服务器
    [GO]文件的收发服务器
  • 原文地址:https://www.cnblogs.com/xupccc/p/9545693.html
Copyright © 2011-2022 走看看