zoukankan      html  css  js  c++  java
  • 7.5 高级数据源---Kafka

    一、Kafka简介

      Kafka是一种高吞吐量的分布式发布订阅消息系统,用户通过Kafka系统可以发布大量的消息,同时也能实时订阅消费消息。Kafka可以同时满足在线实时处理和批量离线处理。

      在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统、批处理系统等),可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换。

    1. Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。
    2. Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
    3. Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。
    4. Producer:负责发布消息到Kafka broker。
    5. Consumer:消息消费者,向Kafka broker读取消息的客户端。
    6. Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

    二、Kafka准备工作

    1.安装Kafka

    厦门大学安装教程

    这里假设已经成功安装Kafka到“/usr/local/kafka”目录下

    2.启动Kafka

    下载的安装文件为Kafka_2.11-0.10.2.0.tgz,前面的2.11就是该Kafka所支持的Scala版本号,后面的0.10.2.0是Kafka自身的版本号。

    打开一个终端,输入下面命令启动Zookeeper服务:

    千万不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。

    打开第二个终端,然后输入下面命令启动Kafka服务:

    千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。

    3.测试Kafka是否正常工作

    再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的Topic:

    默认服务器启动的端口号是2181,replication-factor表示数据要复制1份,--topic后面加topic的名称。列出所有创建的topic,如果能看到wordsendertest,就说明创建成功。

    下面用生产者(Producer)来产生一些数据,请在当前终端内继续输入下面命令:

    上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:

    hello hadoop

    hello spark

    现在可以启动一个消费者,来查看刚才生产者产生的数据。请另外打开第四个终端,输入下面命令:

     

    可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:

    hello hadoop

    hello spark

    三、Spark准备工作

    1.添加相关jar包

      Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),这些jar文件在安装好Spark的时候是不存在的。在spark-shell中执行下面import语句进行测试:

    jar包下载地址:http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0

    把jar文件复制到Spark目录的jars目录下

    继续把Kafka安装目录的libs目录下的所有jar文件复制到“/usr/local/spark/jars/kafka”目录下,请在终端中执行下面命令:

    2.启动spark-shell

    执行如下命令启动spark-shell:

    启动成功后,再次执行如下命令:

    四、编写Spark Streaming程序使用Kafka数据源

    1.编写生产者(producer)程序

    编写KafkaWordProducer程序。执行命令创建代码目录:

    在KafkaWordProducer.scala中输入以下代码:

    下面代码实现功能:每秒钟发m条消息,每条消息包含Y个单词

    package org.apache.spark.examples.streaming
    import java.util.HashMap
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object KafkaWordProducer {
      def main(args: Array[String]) {
        if (args.length < 4) {
          System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
            "<messagesPerSec> <wordsPerMessage>")
          System.exit(1)
        }
        val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
        // Zookeeper connection properties
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String, String](props)
    
    // Send some messages
        while(true) {
          (1 to messagesPerSec.toInt).foreach { messageNum =>
            val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
              .mkString(" ")
                        print(str)
                        println()
            val message = new ProducerRecord[String, String](topic, null, str)
            producer.send(message)
          }
         Thread.sleep(1000)
        }
      }
    }

    2.编写消费者(consumer)程序

    继续在当前目录下创建KafkaWordCount.scala代码文件,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

     当数据量非常大时,不设置检查点有可能会发生信息的丢失。

    package org.apache.spark.examples.streaming
    import org.apache.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.kafka.KafkaUtils
    
    object KafkaWordCount{
    def main(args:Array[String]){
    StreamingExamples.setStreamingLogLevels()
    val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop
    val zkQuorum = "localhost:2181" //Zookeeper服务器地址
    val group = "1"  //topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group“ 
    
    val topics = "wordsender"  //topics的名称          
    val numThreads = 1  //每个topic的分区数
    val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
    val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
    val lines = lineMap.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val pair = words.map(x => (x,1))
    val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
    wordCounts.print
    ssc.start
    ssc.awaitTermination
    }
    } 

    3.编写日志格式设置程序(通用)

    继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:

    package org.apache.spark.examples.streaming
    import org.apache.spark.Logging
    import org.apache.log4j.{Level, Logger}
    /** Utility functions for Spark Streaming examples. */
    object StreamingExamples extends Logging {
      /** Set reasonable logging levels for streaming if the user has not configured log4j. */
      def setStreamingLogLevels() {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
          // We first log something to initialize Spark's default logging, then we override the
          // logging level.
          logInfo("Setting log level to [WARN] for streaming example." +
            " To override add a custom log4j.properties to the classpath.")
          Logger.getRootLogger.setLevel(Level.WARN)
        }
      }
    }

    4.编译打包程序

    创建simple.sbt文件:

     在simple.sbt中输入以下代码:

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
    libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
    

    进行打包编译:  

    5.运行程序

    打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

    执行上面命令后,屏幕上会不断滚动出现新的单词,如下图所示,不要关闭这个终端窗口,就让它一直不断发送单词

    请新打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

     运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

  • 相关阅读:
    转: sql语句获取本周、本月数据
    Java 程序中的静态代码块
    Java 控制台程序输出计时器代码
    网页出现“繁体字”?
    html5中video视频只有声音没有图像
    Clipboard 剪辑板
    区分window8中 ie10 window phone8
    void 0
    touch pointer
    全局作用域 eval
  • 原文地址:https://www.cnblogs.com/nxf-rabbit75/p/12028371.html
Copyright © 2011-2022 走看看