zoukankan      html  css  js  c++  java
  • scala spark streaming 打印kafka 数据

    how-to-fix-java-io-notserializableexception-org-apache-kafka-clients-consumer

     The Kafka Consumer record object is received from Dstream. When you try to print it, it gives error because that object is not serailizable. Instead you should get values from ConsumerRecord object and print it.

    参考链接:https://stackoverflow.com/questions/40570874/how-to-fix-java-io-notserializableexception-org-apache-kafka-clients-consumer

    1、获取kafka数据

    /**
     * @author xlxxx
     * @date xxxx 16:49
     * @version 1.0
     */
    class WindowsFunction {
    
    }
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, LocationStrategies}
    //import org.apache.spark.streaming.kafka.KafkaUtil
    //import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    object WindowsFunction {
    
      //利用用户消费金额总和计算结果以及用户消费次数统计计算结果计算平均消费金额
      def avgFunction(sum:DStream[(String,Double)],count:DStream[(String,Int)]): DStream[(String,Double)] = {
        val payment = sum.join(count).map(r => {
          val user = r._1
          val sum = r._2._1
          val count = r._2._2
          (user,sum/count)
        })
        payment
      }
    
      def main (args: Array[String]) {
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val ssc = new StreamingContext(conf, Seconds(5))
    
    //      val zkQuorum = "localhost:2181,192.168.6.56:2181,192.168.6.57:2181"
          val zkQuorum = "localhost:9092"
          val brokers = zkQuorum
          val consumerGroupName = "user_payment"
          val kafkaTopic = "testkafka"
          val kafkaThreadNum = 1
    
          val topicMap = kafkaTopic.split(",").map((_, kafkaThreadNum.toInt)).toMap
          println(topicMap)
    //      val user_payment = KafkaUtils.createDirectStream(ssc, zkQuorum, consumerGroupName, topicMap).map(x=>{
    //        parse(x._2)
    //      })
    
          val topicsSet = kafkaTopic.split(",").toSet
          val kafkaParams = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
            ConsumerConfig.GROUP_ID_CONFIG -> consumerGroupName,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
          val user_payment = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
         //from old
    //     val Array(brokers, groupId, topics) = args
    //      val messages = KafkaUtils.createDirectStream[String, String](
    //        ssc,
    //        LocationStrategies.PreferConsistent,
    //        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    
          //from olds
          user_payment.foreachRDD { rdd =>
            rdd.foreach { record =>
              val value = record.value()
              println(record)
            }
          }
    //      user_payment.map(jsonLine => print("========"+jsonLine))
    //      user_payment.map(record=>(record.value().toString)).print
    //      user_payment.print()
          //计算每5s每个用户的消费总和
    //      val paymentSum = user_payment.map(jsonLine =>{
    ////        implicit val formats = DefaultFormats
    //        println(jsonLine)
    ////        val user = (jsonLine  "user").extract[String]
    ////        val payment = (jsonLine  "payment").extract[String]
    ////        (user,payment)
    ////        ('user',1)
    //      }).flatMap((_.split(" "))).reduceByKey(_+_)
    //      val paymentSum = user_payment.map(_.value).flatMap().reduceByKey(_+_)
    
          //输出结果
    //      paymentSum.print()
    //
    //      //计算每5s每个用户的消费次数
    //      val paymentCount = user_payment.map(jsonLine =>{
    //        implicit val formats = DefaultFormats
    //        val user = (jsonLine  "user").extract[String]
    //        (user,1)
    //      }).reduceByKey(_+_)
    //
    //      //      paymentCount.print()
    //
    //      //计算每5s每个用户平均的消费金额
    //      val paymentAvg = avgFunction(paymentSum,paymentCount)
    //      //      paymentAvg.print()
    
          //窗口操作,在其中计算不同时间段的结果,入库的话根据使用场景选择吧
    //      def windowsFunction()  {
    //        //每5秒计算最后30秒每个用户消费金额
    //        val windowSum_30 = paymentSum.reduceByKeyAndWindow((a: Double, b: Double) => (a + b),_-_, Seconds(30), Seconds(5))
    //        //        windowSum_30.print()
    //
    //        //每5秒计算最后30秒每个用户消费次数
    //        val windowCount_30 = paymentCount.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),_-_, Seconds(30), Seconds(5))
    //        //        windowCount_30.print()
    //
    //        //每5秒计算最后30秒每个用户平均消费
    //        val windowAvg_30 = avgFunction(windowSum_30,windowCount_30)
    //        //        windowAvg_30.print()
    //
    //        //每5秒计算最后60秒每个用户消费金额
    //        val windowSum_60 = windowSum_30.reduceByKeyAndWindow((a:Double,b:Double)=>(a+b),_-_,Seconds(10),Seconds(5))
    //        //       windowSum_60.print()
    //
    //        //每5秒计算最后60秒每个用户消费次数
    //        val windowCount_60 = windowCount_30.reduceByKeyAndWindow((a:Int,b:Int) => (a+b),_-_,Seconds(10),Seconds(5))
    //        //        windowCount_60.print()
    //
    //        //每5秒计算最后60秒每个用户平均消费
    //        val windowAvg_60 = avgFunction(windowSum_60,windowCount_60)
    //        //        windowAvg_60.print
    //      }
    //
    //      windowsFunction()
    
          ssc
        }
    
        val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)
    
        context.start()
        context.awaitTermination()
      }
    }
    

     2、debug 截图:

        

      //map 打印

  • 相关阅读:
    2018北美部分CS项目学费
    APP接口自动化测试JAVA+TestNG(二)之TestNG简介与基础实例
    浅谈MITM攻击之信息窃取(解密315晚会报道的免费WIFI窃取个人信息)
    APP接口自动化测试JAVA+TestNG(一)之框架环境搭建
    Android测试提升效率批处理脚本(二)
    Android APP压力测试(三)之Monkey日志自动分析脚本
    Android系统build.prop文件
    Android APP压力测试(二)之Monkey信息自动收集脚本
    Android APP压力测试(一)之Monkey工具介绍
    Android反编译(三)之重签名
  • 原文地址:https://www.cnblogs.com/cbugs/p/14213305.html
Copyright © 2011-2022 走看看