zoukankan      html  css  js  c++  java
  • Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费

    一、Spark Streaming

      Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP sockets)中提取,并且可以使用以高级函数表示的复杂算法进行处理map,例如reducejoinwindow最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。

                 

    二、SparkStreaming实现

      Kafka和Zookeeper事先装过,没有先安装Zookeeper,则无法运行Kafka服务。但是,已经为CloudKarafka群集安装并配置了Zookeeper。

      我搭建的是 Scala 的 maven 项目,项目和环境都在单机上运行。

      1、先看我的 pom.xml 配置:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.sparkstream</groupId>
        <artifactId>LyhSparkStreaming</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spark.version>2.3.3</spark.version>
            <scala.version>2.11</scala.version>
        </properties>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>commons-beanutils</groupId>
                        <artifactId>commons-beanutils-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.11</artifactId>
                <version>1.6.3</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.19</version>
                    <configuration>
                        <skip>true</skip>
                    </configuration>
                </plugin>
    
            </plugins>
        </build>
    
    </project>
    

      我用的Scala版本是 2.11.8。

      2、我的 Producer,代码读取 text1.txt文件中的内容,然后把每行数据发送到 Kafka的名叫 Hunter 的 topic 中,这个名字可以自己改,如果Kafka中不存在这个topic的话,系统会自动创建。text1.txt文件自己创建一个,一行一行的数据就可以,并没什么要求,文件的路径改成自己的。

    package KafkaAndStreaming
    
    import java.io.{BufferedReader, FileInputStream, FileNotFoundException, IOException, InputStreamReader}
    import java.util.Properties
    
    import org.apache.kafka.clients.consumer.KafkaConsumer
    import org.apache.kafka.clients.producer.{Callback, KafkaProducer, Producer, ProducerRecord, RecordMetadata}
    
    
    object TestKafkaProducer {
    
      type Key = String
      type Val = String
    
      def getProducerCnfig():Properties={
      /**
         * 对于kafka producer的相关配置文件项
         * 还有其他的属性,自己去查一下
       **/
        val props:Properties = new Properties()
        // Kafka 的 url
        props.put("bootstrap.servers", "localhost:9092")
        // group.id 随便写
        props.put("group.id", "producer-group")
        props.put("replication.factor", "min.insync.replicas")
        // 备份数量
        props.put("min.insync.replicas", "3")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props
      }
    
      def main(args: Array[String])={
        // 获取配置文件
        val props:Properties = getProducerCnfig()
        // 创建生产者
        var producer = new KafkaProducer[String, String](props)
        try{
          //读取保存的文件
          val fis:FileInputStream = new FileInputStream("/Users/hunter/text1.txt")
          val isr:InputStreamReader = new InputStreamReader(fis, "UTF-8")
          val br:BufferedReader = new BufferedReader(isr)
          var line: String = ""
    
          line = br.readLine()
          var i: Int=0
    
          while (line != null) {
            producer.send(toMessage(line.toString, Option(i.toString), Option("Hunter")),new Callback {
              override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
                println(s"""Message $i, send to: """ + recordMetadata.topic())
              }
            })
            i += 1
            line = br.readLine()
          }
          producer.close()
          br.close()
          isr.close()
          fis.close()
        } catch {
          case ex: FileNotFoundException =>{
            println("Missing file exception")
          }
          case ex: IOException => {
            println("IO Exception")
          }
          case _ =>{
            println("Have other Error")
          }
        }
    //    dealWithData
      }
    
      //把我的消息包装成了ProducerRecord
      private def toMessage(value: String, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
        val t = topic.getOrElse(Option("test").getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
        require(!t.isEmpty, "Topic must not be empty")
        key match {
          case Some(k) => new ProducerRecord(t, k, value)
          case _ => new ProducerRecord(t, value)
        }
      }
    }
    

      3、我的 Consumer端,这里主函数启动需要参数,参数为 localhost:9092 Hunter

        

      代码如下:

    package KafkaAndStreaming
    
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object KafkaAndPrintInSpark {
      //判断设置的时输入参数,是否包含brokers 和 topic 至少参数的长度为2,即单机运行一个test的topic:   broker=localhost:9092 topic=test
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(
            s"""
               |Usage: DirectKafkaWordCount <brokers> <topics>
               |  <brokers> is a list of one or more Kafka brokers
               |  <topics> is a list of one or more kafka topics to consume from
            """.stripMargin)
          System.exit(1)
        }
    
        //将参数args读入到数组中
        val Array(brokers, topics) = args
    
        // 用2秒批间隔创建上下文
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        // 创建kafka流与brokers和topic
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> brokers,
          "bootstrap.servers" -> "localhost:9092",
    //      "auto.offset.reset" -> "smallest",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
    
    // 此处注释部分是自定义偏移量 5L代表从 5开始读取。默认是读取最新的,offset从上一次读取结束的位置开始
    //    val offsetList = List(("Hunter", 0, 5L))
    //    val fromOffsets = setFromOffsets(offsetList)//对List进行处理,变成需要的格式,即Map[TopicAndPartition, Long]
    //    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata,这个所有使用情况都是一样的,就这么写
    //    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
    //      ssc, kafkaParams, fromOffsets, messageHandler)
    
        messages.foreachRDD( rdd => {
    //      if(rdd.count()>0) {
            rdd.foreach( records => {
              println("_1: " + records._1)
              println("_2: " + records._2)
            })
    
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition { iter =>
              val o: OffsetRange = offsetRanges(TaskContext.get.partitionId())
              println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            }
    //      }
        })
        // 开始计算
        ssc.start()
        ssc.awaitTermination()
      }
    
      def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
        for (offset <- list) {
          val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
          fromOffsets += (tp -> offset._3)           // offset位置
        }
        fromOffsets
      }
    }
    

      三、最后结果

        1、Producer

           

        2、Consumer

       

  • 相关阅读:
    关于BlockingQueue
    关于java的线程
    mongodb的锁和高并发
    innodb的锁和高并发
    mysql的事务隔离级别及其使用场景
    mongodb分页
    ReentrantLock和Synchronized
    spring boot MVC
    svn 入门
    多线程的返回值等问题
  • 原文地址:https://www.cnblogs.com/Lyh1997/p/11458614.html
Copyright © 2011-2022 走看看