zoukankan      html  css  js  c++  java
  • Spark- SparkStreaming可更新状态的实例

    Producer

    package zx.zx.sparkkafka
    
    
    import java.util.Properties
    
    import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
    
    import scala.util.Random
    
    /**
     * Created by 166 on 2017/9/6.
     */
    object Producer {
      val topic="myWordCount1"
      val buffer: StringBuilder = new StringBuilder
      val message: Array[String] = Array("hadoop","scala","spark","kafka","java","storm","redis","hello","world")
      def getMessage:String={
        buffer.clear()
        for(info<-0 to 10)
          {
            if(info!=10) buffer.append(message(Random.nextInt(message.length)).concat(" ")) else buffer.append(message(Random.nextInt(message.length)))
          }
        buffer.toString()
      }
    
      def main(args: Array[String]) {
    
        //properties用户保存一下配置信息的
        val properties= new Properties
        //添加配置信息:metadata.broker.list指定kafka的Borker的地址和端口,可以是多个Borker的地址
        properties.put("metadata.broker.list","192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092")
        //数据写入到kafka中的使用序列化方式
        properties.put("serializer.class","kafka.serializer.StringEncoder")
        val producer= new Producer[String,String](new ProducerConfig(properties))
        for (i<-0 until Integer.MAX_VALUE){
          Thread.sleep(500)
          val message: KeyedMessage[String, String] = KeyedMessage[String,String](topic,"",null,getMessage)
          producer.send(message)
        }
      }
    }

    SparkStreamingDemo

    注意必须设置checkpoint

    package zx.zx.sparkkafka
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{HashPartitioner, SparkConf}
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * Created by 166 on 2017/9/6.
     */
    
    
    object SparkStreamingDemo {
    
      /**
       * Iterator[(String, Seq[Int], Option[Int])]
       * 第一个:key,单词
       * 第二个:当前批次该单词出现的次数
       * 第三个:初始值或者以前累加过的值
       */
      val updataFunc=(iter:Iterator[(String, Seq[Int], Option[Int])])=>{
          iter.map(t=>(t._1,t._2.sum+t._3.getOrElse(0)))
      }
      def main(args: Array[String]) {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        //创建SparkConf并设置AppName
        val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")
        //创建StreamingContext
        val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
        //设置检查点-----如果想要更新历史状态(累加),要设置checkpoint
    //checkpoint必须设置,一般来说设置中HDFS
        ssc.checkpoint("C:\Users\166\Desktop\Data\ck")
    
    
        //接受命令行中的参数
        //从kafka中拉取数据
        val zkQuorum="srv01:2181,srv02:2181,srv03:2181"
        val groupId="g1"//groupID=UUID.randomUUID().toString
    
        //当话题很多时就使用这个要切分---topics={t1,t2,t3}
        //val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val topic = Map("myWordCount1"->3)
        val topicAndLine: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topic)
        //(key,message)--->map(_._2)===>message
        val lines: DStream[String] = topicAndLine.map(_._2) //该数据可能是多行的
        //一行一行地取出来,切分数据
        //redis spark scala hadoop hello scala java java hadoop scala world
        //(redis,1),(spark,1)
        val words: DStream[(String, Int)] = lines.map(_.split(" ")).flatMap(x=>x).map((_,1))//一行一行地取出来,切分数据
        //统计单词数量
        val result: DStream[(String, Int)] = words.updateStateByKey(updataFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
        //将结果打印到控制台
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    InstallShield2015制作安装包----------卸载前结束执行中的进程
    InstallShield2015制作安装包----------安装过程中修改文件内容
    InstallShield2015制作安装包----------卸载后删除安装目录和文件
    InstallShield2015制作安装包----------安装后实现自动运行
    snmp getTable demo :iftable ipAddresstable
    snmp
    Android Lazy url
    eclipse key
    demo16Toast
    demo15 AlertDialog
  • 原文地址:https://www.cnblogs.com/RzCong/p/7822563.html
Copyright © 2011-2022 走看看