zoukankan      html  css  js  c++  java
  • spark 系列之七 SparkStreaming数据源之kafka流

    突然感觉kafka跟socket有点像,只是kafka具备更多的功能,是一个经典的消费者生产者模式。

    kafka中有不同的topic,生产者可以把数据发送到不同的topic,消费可以指定相应的topic进行消费。

    本文就kafka是什么,不做详细的介绍依旧是上两张图。

    图一:展示了kafka的强大的扩展能力,扩展能力强了,自然吞吐能力会大大增强。

     图二:展示了kafka的工作模式,也就是在业务上的扩展能力,业务能力很强。

    本讲主要是在本地操作,首先你需要在本地下载一个kafka的服务,并且启动它,当前启动之前你肯定要先启动ZK,网上有大量的怎么在windows上启动kafka的教程

    然后,在你项目中,根据你的spark版本可kafka的版本下载相应的驱动包(我理解成驱动包,有点类似于前面的数据库的驱动包,方便连接kafka和SparkStreaming)

    我使用的pom.xml的配置如下:

            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>

    首先使用kafka 生产消息代码如下:

    import java.util.HashMap
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    
    object KafkaWordProducer {
      def main(args: Array[String]) {
        /**
         * 第1个参数localhost:9092是Kafka的broker的地址,第2个参数wordsender是topic的名称
         * 第3个参数“3”表示每秒发送3条消息,第4个参数“10”表示,每条消息包含10个单词(实际上就是10个整数)
         */
        val Array(brokers, topic, messagesPerSec, wordsPerMessage) = Array("localhost:9092","wordsender","3","10")
        // Zookeeper connection properties
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String, String](props)
        // Send some messages
        while(true) {
          (1 to messagesPerSec.toInt).foreach { messageNum =>
            val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
            println(str)
            val message = new ProducerRecord[String, String](topic, null, str)
            producer.send(message)
          }
          Thread.sleep(1000)
        }
      }
    }

    运行情况如下:

    其次使用sparkStreaming来消费kafka生产的数据,并进行相应的数据操作,代码如下:

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object SparkKafka {
      def main(args:Array[String]){
        val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sc,Seconds(10))
        ssc.checkpoint("file:///D:/2020Q4/spark_kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop
        val zkQuorum = "localhost:2181" //Zookeeper服务器地址
        val group = "1"  //topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group"
        val topics = "wordsender"  //topics的名称
        val numThreads = 1  //每个topic的分区数
        val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    
        val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
        val lines = lineMap.map(_._2)
        val words = lines.flatMap(_.split(" "))
        val pair = words.map(x => (x,1))
        val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) 
        wordCounts.print
        ssc.start
        ssc.awaitTermination
      }
    }

    运行结果如下:

    以上:) 

  • 相关阅读:
    iphone那些事儿
    【转】我面试过最出色的项目主管,入职半年就离职了。
    net::ERR_ABORTED 404 (Not Found)
    四大名著
    测试心理状态
    typescript那些事儿
    flexbox父盒子flex-direction属性
    flexbox父盒子align-content属性
    flexbox父盒子flex-wrap属性
    flexbox父盒子align-items属性
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14218203.html
Copyright © 2011-2022 走看看