zoukankan      html  css  js  c++  java
  • SparkStreaming+Kafka 实现统计基于缓存的实时uv

    前言

    本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现统计基于缓存的实时wordcount,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。

    1、数据

    数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)

    2、kafka topic

    首先在kafka建一个程序用到topic:KafkaUV

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV

    3、创建checkpoint的hdfs目录

    我的目录为:/spark/dkl/kafka/UV_checkpoint

    hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint

    4、Spark代码

    启动下面的程序

    package com.dkl.leanring.spark.kafka
    
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    object KafkaUV {
      def main(args: Array[String]): Unit = {
        //初始化,创建SparkSession
        val spark = SparkSession.builder().appName("KafkaUV").master("local[2]").enableHiveSupport().getOrCreate()
        //初始化,创建sparkContext
        val sc = spark.sparkContext
        //初始化,创建StreamingContext,batchDuration为5秒
        val ssc = new StreamingContext(sc, Seconds(5))
    
        //开启checkpoint机制
        ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/UV_checkpoint")
    
        //kafka集群地址
        val server = "ambari.master.com:6667"
    
        //配置消费者
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> server, //kafka集群地址
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "UpdateStateBykeyWordCount", //消费者组名
          "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量   earliest 、none
          "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
        val topics = Array("KafkaUV") //消费主题
    
        //基于Direct方式创建DStream
        val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    
        //开始执行WordCount程序
    
        //以空格为切分符切分单词,并转化为 (word,1)形式
        val words = stream.flatMap(_.value().split(" ")).map((_, 1))
        val wordCounts = words.updateStateByKey(
          //每个单词每次batch计算的时候都会调用这个函数
          //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
          //第二个参数为这个key对应的之前的状态
          (values: Seq[Int], state: Option[Int]) => {
            var newValue = state.getOrElse(0)
            values.foreach(newValue += _)
            Option(newValue)
          })
        //共享变量,便于后面的比较是否用新的uv
        val accum = sc.longAccumulator("uv")
        wordCounts.foreachRDD(rdd => {
          //如果uv增加
          if (rdd.count > accum.value) {
            //打印uv
            println(rdd.count)
            //将共享变量的值更新为新的uv
            accum.add(rdd.count - accum.value)
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }

    5、生产几条数据

    随便写几条即可

    bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic KafkaUV 

    6、结果

    根据结果可以看到,既做到了历史消息用户的累计,也做到了用户的去重

    前言

    本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。

    1、数据

    数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)

    2、kafka topic

    首先在kafka建一个程序用到topic:KafkaUV

    1
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV

     

    3、创建checkpoint的hdfs目录

    我的目录为:/spark/dkl/kafka/UV_checkpoint

    1
    hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint

     

    4、Spark代码

    启动下面的程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    package com.dkl.leanring.spark.kafka

    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    object KafkaUV {
    def main(args: Array[String]): Unit = {
    //初始化,创建SparkSession
    val spark = SparkSession.builder().appName("KafkaUV").master("local[2]").enableHiveSupport().getOrCreate()
    //初始化,创建sparkContext
    val sc = spark.sparkContext
    //初始化,创建StreamingContext,batchDuration为5秒
    val ssc = new StreamingContext(sc, Seconds(5))

    //开启checkpoint机制
    ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/UV_checkpoint")

    //kafka集群地址
    val server = "ambari.master.com:6667"

    //配置消费者
    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> server, //kafka集群地址
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "UpdateStateBykeyWordCount", //消费者组名
    "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量 earliest 、none
    "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("KafkaUV") //消费主题

    //基于Direct方式创建DStream
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    //开始执行WordCount程序

    //以空格为切分符切分单词,并转化为 (word,1)形式
    val words = stream.flatMap(_.value().split(" ")).map((_, 1))
    val wordCounts = words.updateStateByKey(
    //每个单词每次batch计算的时候都会调用这个函数
    //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
    //第二个参数为这个key对应的之前的状态
    (values: Seq[Int], state: Option[Int]) => {

    var newValue = state.getOrElse(0)
    values.foreach(newValue += _)
    Option(newValue)

    })

    //共享变量,便于后面的比较是否用新的uv
    val accum = sc.longAccumulator("uv")

    wordCounts.foreachRDD(rdd => {

    //如果uv增加
    if (rdd.count > accum.value) {
    //打印uv
    println(rdd.count)
    //将共享变量的值更新为新的uv
    accum.add(rdd.count - accum.value)
    }
    })

    ssc.start()
    ssc.awaitTermination()

    }

    }

     

    5、生产几条数据

    随便写几条即可

    1
    bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic KafkaUV

    6、结果

    根据结果可以看到,既做到了历史消息用户的累计,也做到了用户的去重

     
  • 相关阅读:
    CGI与FastCGI
    Google Protocol Buffer 的使用和原理
    AMQP协议
    Java 多线程 并发编程
    深入理解HashMap
    Bitmap 位图
    Bloom Filter概念和原理
    BloomFilter–大规模数据处理利器
    java bitmap/bitvector的分析和应用
    Linux iptables 备忘
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13130593.html
Copyright © 2011-2022 走看看