zoukankan      html  css  js  c++  java
  • Apache Spark技术实战之1 -- KafkaWordCount

    欢迎转载,转载请注明出处,徽沪一郎。

    概要

    Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程。Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细。

    本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好。

    搭建Kafka集群

    步骤1:下载kafka 0.8.1及解压

    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
    tar zvxf kafka_2.10-0.8.1.1.tgz
    cd kafka_2.10-0.8.1.1
    

    步骤2:启动zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    步骤3:修改配置文件config/server.properties,添加如下内容

    host.name=localhost
    
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    advertised.host.name=localhost
    

    步骤4:启动Kafka server

    bin/kafka-server-start.sh config/server.properties
    

    步骤5:创建topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test
    

    检验topic创建是否成功

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    如果正常返回test

    步骤6:打开producer,发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    ##启动成功后,输入以下内容测试
    This is a message
    This is another message
    

     步骤7:打开consumer,接收消息

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    ###启动成功后,如果一切正常将会显示producer端输入的内容
    This is a message
    This is another message
    

    运行KafkaWordCount

    KafkaWordCount源文件位置 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

    尽管里面有使用说明,见下文,但如果不是事先对Kafka有一定的了解的话,决然不知道这些参数是什么意思,也不知道该如何填写

    /**
     * Consumes messages from one or more topics in Kafka and does wordcount.
     * Usage: KafkaWordCount    
     *    is a list of one or more zookeeper servers that make quorum
     *    is the name of kafka consumer group
     *    is a list of one or more kafka topics to consume from
     *    is the number of threads the kafka consumer should use
     *
     * Example:
     *    `$ bin/run-example 
     *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 
     *      my-consumer-group topic1,topic2 1`
     */
    object KafkaWordCount {
      def main(args: Array[String]) {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCount    ")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc =  new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L))
          .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    讲清楚了写这篇博客的主要原因之后,来看一看该如何运行KafkaWordCount

    步骤1:停止运行刚才的kafka-console-producer和kafka-console-consumer

    步骤2:运行KafkaWordCountProducer

    bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
    

    解释一下参数的意思,localhost:9092表示producer的地址和端口, test表示topic,3表示每秒发多少条消息,5表示每条消息中有几个单词

    步骤3:运行KafkaWordCount

     bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
    

    解释一下参数, localhost:2181表示zookeeper的监听地址,test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致,test表示topic,1表示线程数。

  • 相关阅读:
    ThinkPHP 小于5.0.24 远程代码执行高危漏洞 修复方案
    Nginx负载均衡配置与负载策略
    【高级】PHP-FPM和Nginx的通信机制
    快手、抖音、微视类短视频SDK接入教程,7步就能搞定
    我是怎么一步步用go找出压测性能瓶颈
    一个域名引发的血案……
    开发效率太低?您可能没看这篇文章
    想熟悉PostgreSQL?这篇就够了
    tee命令
    linux下常用的日志分析命令
  • 原文地址:https://www.cnblogs.com/hseagle/p/3887507.html
Copyright © 2011-2022 走看看