zoukankan      html  css  js  c++  java
  • 每日一题 为了工作 2020 0502 第六十一题

    //使用kafka+sparkStreaming进行数据处理

    //从kafka拉取数据

    package com.swust.predict
    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    object GetDataFromKafka {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("GetDataFromKafka").setMaster("local[*]")
        conf.set("spark.streaming.kafka.consumer.cache.enabled","false")
    
        val ssc = new StreamingContext(conf,Seconds(5))
        val topics = Set("car_events")
        val brokers = "data001:9092,data003:9092,data004:9092"
        val kafkaParams = Map[String,Object](
          "bootstrap.servers" -> brokers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "predictGroup",//
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
        )
        val index = 1
        //创建Dstream
        val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        val events: DStream[String] = kafkaDstream.map(line => {
          val value: String = line.value().toString
          value
        })
        val show: Unit = events.foreachRDD(rdd => {
          rdd.foreachPartition(data => {
            //data.take(200)
            data.foreach(one => {
              println(one)
            })
    
          })
        })
    //    events.foreachRDD(rdd =>{
    //      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //      events.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
    //    })
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

      

    //向kafka推送数据

    package com.traffic.streaming
    
    import java.util.Properties
    
    import net.sf.json.JSONObject
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.spark.{SparkConf, SparkContext}
    
    //向kafka car_events中生产数据
    object KafkaEventProducer {
      def main(args: Array[String]): Unit = {
    
        //设置需要写入数据的消息队列
        val topic = "car_events"
        //设置配置属性信息
        val props = new Properties()
        props.put("bootstrap.servers", "data001:9092,data003:9092,data004:9092")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
        //创建kafka消息队列的生产者对象
        val producer = new KafkaProducer[String,String](props)
    
        val sparkConf = new SparkConf().setAppName("traffic data").setMaster("local[4]")
        val sc = new SparkContext(sparkConf)
    
        val records: Array[Array[String]] = sc.textFile("F:\code\AnyMaven\data\carFlow_all_column_test.txt")
          .filter(!_.startsWith(";")) //过滤掉不以;开头的数据
          .map(_.split(",")).collect()
    
    
        for (i <- 1 to 1000) {
          for (record <- records) {
            // prepare event data
            val event = new JSONObject()
            event.put("camera_id", record(0))
            event.put("car_id", record(2))
            event.put("event_time", record(4))
            event.put("speed", record(6))
            event.put("road_id", record(13))
            // produce event message
            //向kafka中输入数据
            producer.send(new ProducerRecord[String, String](topic, event.toString))
    //        println("Message sent: " + event)
            Thread.sleep(200)
          }
        }
        sc.stop
      }
    }
    

      

    //运行结果

    //向kafka集群拖送数据

     //从kafka集群拉取数据

     

  • 相关阅读:
    free pascal dialect
    free pascal
    free pascal
    跳槽后在新公司的一点感悟
    安全攻防技能30讲
    设计模式之美(二)——设计模式
    设计模式之美(一)——设计原则、规范与重构
    数据结构和算法躬行记(8)——动态规划
    倾斜摄影实景三维在智慧工厂 Web 3D GIS 数字孪生应用
    智慧文旅促进旅游业发展,可视化带你云游武夷
  • 原文地址:https://www.cnblogs.com/walxt/p/12819736.html
Copyright © 2011-2022 走看看