zoukankan      html  css  js  c++  java
  • 基于Kafka+Spark Streaming+HBase实时点击流案例

     背景

    Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase

    实现思路

    1. 实现Kafka消息生产者模拟器
    2. Spark Streaming采用Direct Approach方式实时获取Kafka中数据
    3. Spark Streaming对数据进行业务计算后存储到HBase

    组件版本

    Spark 2.1.0  Kafka0.9.0.1 HBase1.2.0

    代码实现

    Kafka消息模拟器

    object KafkaMessageGenerator {
    
      private val random = new Random()
      private var pointer = -1
    
      private val os_type = Array(
        "Android", "IPhone OS",
        "None", "Windows Phone"
      )
    
      def click(): Double = {
        random.nextInt(10)
      }
    
      def getOsType(): String = {
        pointer = pointer + 1
        if (pointer >= os_type.length) {
          pointer = 0
          os_type(pointer)
        } else {
          os_type(pointer)
        }
      }
        def main(args: Array[String]): Unit = {
    
          val topic = "user_events"
          val props = new Properties()
          props.put("bootstrap.servers", "10.3.71.154:9092")
          props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
          val producer = new KafkaProducer[String, String](props)
          while (true) {
            val event: JSONObject = new JSONObject()
            event.put("uid", UUID.randomUUID()) //随机生成用户id
            event.put("event_time", System.currentTimeMillis.toString) //记录事件发生时间
            event.put("os_type", getOsType) //设备类型
            event.put("click_count", click) //点击次数
            val record = new ProducerRecord[String, String](topic, event.toString)
            producer.send(record)
            println("Message sent: " + event)
    
            Thread.sleep(200)
          }
        }
    }

    Spark Streaming主类

    object PageViewStream {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("PageViewStream").setMaster("local[*]")
        //创建StreamingContext  批处理间隔5s
        val ssc = new StreamingContext(conf, Seconds(5))
        // kafka配置
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> "10.3.71.154:9092",
          "serializer.class" -> "kafka.serializer.StringEncoder"
        )
        //创建一个direct stream
        val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("user_events"))
        val events: DStream[JSONObject] = kafkaStream.flatMap(line => {
          val data: JSONObject = JSON.parseObject(line._2)
          Some(data)
        })
    
        // 计算用户点击次数
        val userClicks: DStream[(String, Integer)] = events.map(x => (x.getString("uid"), x.getInteger("click_count"))).reduceByKey(_ + _)
        userClicks.foreachRDD(rdd => {
          rdd.foreachPartition(partitionOfRecords => {
            //Hbase配置
            val tableName = "PageViewStream2"
            val hbaseConf = HBaseConfiguration.create()
            hbaseConf.set("hbase.zookeeper.quorum", "master66")
            hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
            val conn = ConnectionFactory.createConnection(hbaseConf)
            val StatTable = conn.getTable(TableName.valueOf(tableName))
            partitionOfRecords.foreach(pair => {
              //用户ID
              val uid = pair._1
              //点击次数
              val click = pair._2
              //组装数据 创建put对象 rowkey
              val put = new Put(Bytes.toBytes(uid))
              put.addColumn("Stat2".getBytes, "ClickStat".getBytes, Bytes.toBytes("TESTS============"))
              StatTable.put(put)
            })
          })
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
  • 相关阅读:
    Linux kill, killall, kill -9
    mongodb分片集群(无副本集)搭建
    如何用vs查看结构体布局
    Winsock在Windows下的编程教程(C语言)(图文并茂,超长教程)
    HTTPS 中双向认证SSL 协议的具体过程
    RAR压缩解压命令
    x64栈结构
    ASP.NET Web API下的HttpController激活:程序集的解析
    Lucene学习-深入Lucene分词器,TokenStream获取分词详细信息
    IO多路复用之select
  • 原文地址:https://www.cnblogs.com/itboys/p/9156701.html
Copyright © 2011-2022 走看看