zoukankan      html  css  js  c++  java
  • Spark- SparkStreaming可更新状态的实例

    Producer

    package zx.zx.sparkkafka
    
    
    import java.util.Properties
    
    import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
    
    import scala.util.Random
    
    /**
     * Created by 166 on 2017/9/6.
     */
    object Producer {
      val topic="myWordCount1"
      val buffer: StringBuilder = new StringBuilder
      val message: Array[String] = Array("hadoop","scala","spark","kafka","java","storm","redis","hello","world")
      def getMessage:String={
        buffer.clear()
        for(info<-0 to 10)
          {
            if(info!=10) buffer.append(message(Random.nextInt(message.length)).concat(" ")) else buffer.append(message(Random.nextInt(message.length)))
          }
        buffer.toString()
      }
    
      def main(args: Array[String]) {
    
        //properties用户保存一下配置信息的
        val properties= new Properties
        //添加配置信息:metadata.broker.list指定kafka的Borker的地址和端口,可以是多个Borker的地址
        properties.put("metadata.broker.list","192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092")
        //数据写入到kafka中的使用序列化方式
        properties.put("serializer.class","kafka.serializer.StringEncoder")
        val producer= new Producer[String,String](new ProducerConfig(properties))
        for (i<-0 until Integer.MAX_VALUE){
          Thread.sleep(500)
          val message: KeyedMessage[String, String] = KeyedMessage[String,String](topic,"",null,getMessage)
          producer.send(message)
        }
      }
    }

    SparkStreamingDemo

    注意必须设置checkpoint

    package zx.zx.sparkkafka
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{HashPartitioner, SparkConf}
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * Created by 166 on 2017/9/6.
     */
    
    
    object SparkStreamingDemo {
    
      /**
       * Iterator[(String, Seq[Int], Option[Int])]
       * 第一个:key,单词
       * 第二个:当前批次该单词出现的次数
       * 第三个:初始值或者以前累加过的值
       */
      val updataFunc=(iter:Iterator[(String, Seq[Int], Option[Int])])=>{
          iter.map(t=>(t._1,t._2.sum+t._3.getOrElse(0)))
      }
      def main(args: Array[String]) {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        //创建SparkConf并设置AppName
        val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")
        //创建StreamingContext
        val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
        //设置检查点-----如果想要更新历史状态(累加),要设置checkpoint
    //checkpoint必须设置,一般来说设置中HDFS
        ssc.checkpoint("C:\Users\166\Desktop\Data\ck")
    
    
        //接受命令行中的参数
        //从kafka中拉取数据
        val zkQuorum="srv01:2181,srv02:2181,srv03:2181"
        val groupId="g1"//groupID=UUID.randomUUID().toString
    
        //当话题很多时就使用这个要切分---topics={t1,t2,t3}
        //val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val topic = Map("myWordCount1"->3)
        val topicAndLine: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topic)
        //(key,message)--->map(_._2)===>message
        val lines: DStream[String] = topicAndLine.map(_._2) //该数据可能是多行的
        //一行一行地取出来,切分数据
        //redis spark scala hadoop hello scala java java hadoop scala world
        //(redis,1),(spark,1)
        val words: DStream[(String, Int)] = lines.map(_.split(" ")).flatMap(x=>x).map((_,1))//一行一行地取出来,切分数据
        //统计单词数量
        val result: DStream[(String, Int)] = words.updateStateByKey(updataFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
        //将结果打印到控制台
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    [转]使用@Test 也可以从spring容器中获取依赖注入
    idea/ecipse中使用maven集成springmvc相关jar包时候,出错:java.lang.ClassNotFoundException: org.springframework.web.servlet.DispatcherServlet
    mongodb 权限设置--用户名、密码、端口
    java中import static和import的区别【转】
    python 数字的四舍五入的问题
    数据库——索引(面试、笔试必会)
    Python 中的那些坑总结——持续更新
    python2和python3的区别——持续更新
    常用的排序算法的时间复杂度和空间复杂度
    Libpacp 深度剖析
  • 原文地址:https://www.cnblogs.com/RzCong/p/7822563.html
Copyright © 2011-2022 走看看