zoukankan      html  css  js  c++  java
  • Apache Spark技术实战之1 -- KafkaWordCount

    欢迎转载,转载请注明出处,徽沪一郎。

    概要

    Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程。Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细。

    本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好。

    搭建Kafka集群

    步骤1:下载kafka 0.8.1及解压

    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
    tar zvxf kafka_2.10-0.8.1.1.tgz
    cd kafka_2.10-0.8.1.1
    

    步骤2:启动zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    步骤3:修改配置文件config/server.properties,添加如下内容

    host.name=localhost
    
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    advertised.host.name=localhost
    

    步骤4:启动Kafka server

    bin/kafka-server-start.sh config/server.properties
    

    步骤5:创建topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test
    

    检验topic创建是否成功

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    如果正常返回test

    步骤6:打开producer,发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    ##启动成功后,输入以下内容测试
    This is a message
    This is another message
    

     步骤7:打开consumer,接收消息

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    ###启动成功后,如果一切正常将会显示producer端输入的内容
    This is a message
    This is another message
    

    运行KafkaWordCount

    KafkaWordCount源文件位置 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

    尽管里面有使用说明,见下文,但如果不是事先对Kafka有一定的了解的话,决然不知道这些参数是什么意思,也不知道该如何填写

    /**
     * Consumes messages from one or more topics in Kafka and does wordcount.
     * Usage: KafkaWordCount    
     *    is a list of one or more zookeeper servers that make quorum
     *    is the name of kafka consumer group
     *    is a list of one or more kafka topics to consume from
     *    is the number of threads the kafka consumer should use
     *
     * Example:
     *    `$ bin/run-example 
     *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 
     *      my-consumer-group topic1,topic2 1`
     */
    object KafkaWordCount {
      def main(args: Array[String]) {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCount    ")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc =  new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")
    
        val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L))
          .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    讲清楚了写这篇博客的主要原因之后,来看一看该如何运行KafkaWordCount

    步骤1:停止运行刚才的kafka-console-producer和kafka-console-consumer

    步骤2:运行KafkaWordCountProducer

    bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
    

    解释一下参数的意思,localhost:9092表示producer的地址和端口, test表示topic,3表示每秒发多少条消息,5表示每条消息中有几个单词

    步骤3:运行KafkaWordCount

     bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
    

    解释一下参数, localhost:2181表示zookeeper的监听地址,test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致,test表示topic,1表示线程数。

  • 相关阅读:
    一致性哈希算法
    Discourse 的标签(Tag)只能是小写的原因
    JIRA 链接 bitbucket 提示错误 Invalid OAuth credentials
    JIRA 如何连接到云平台的 bitbucket
    Apache Druid 能够支持即席查询
    如何在 Discourse 中配置使用 GitHub 登录和创建用户
    Apache Druid 是什么
    Xshell 如何导入 PuTTYgen 生成的 key
    windows下配置Nginx支持php
    laravel连接数据库提示mysql_connect() :Connection refused...
  • 原文地址:https://www.cnblogs.com/hseagle/p/3887507.html
Copyright © 2011-2022 走看看