zoukankan      html  css  js  c++  java
  • SparkStreaming+Kafka 实现基于缓存的实时wordcount

    前言

    本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreaming的wordcount程序比如官网上的,只能统计最新时间间隔内的每个单词的数量,而不能将历史的累加起来,本文是看了教程之后,自己实现了一下kafka的程序,记录在这里。其实没什么难度,只是用了一个updateStateByKey算子就能实现,因为第一次用这个算子,所以正好学习一下。

    1、数据

    数据是我随机在kafka里生产的几条,单词以空格区分开

    2、kafka topic

    首先在kafka建一个程序用到topic:UpdateStateBykeyWordCount

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic UpdateStateBykeyWordCount

    3、创建checkpoint的hdfs目录

    我的目录为:/spark/dkl/kafka/wordcount_checkpoint

    hadoop fs -mkdir -p /spark/dkl/kafka/wordcount_checkpoint

    4、Spark代码

    启动下面的程序

    package com.dkl.leanring.spark.kafka
    
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.Seconds
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    object UpdateStateBykeyWordCount {
    
      def main(args: Array[String]): Unit = {
        //初始化,创建SparkSession
        val spark = SparkSession.builder().appName("sskt").master("local[2]").enableHiveSupport().getOrCreate()
        //初始化,创建sparkContext
        val sc = spark.sparkContext
        //初始化,创建StreamingContext,batchDuration为1秒
        val ssc = new StreamingContext(sc, Seconds(5))
    
        //开启checkpoint机制
        ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/wordcount_checkpoint")
    
        //kafka集群地址
        val server = "ambari.master.com:6667"
    
        //配置消费者
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> server, //kafka集群地址
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "UpdateStateBykeyWordCount", //消费者组名
          "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量   earliest 、none
          "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
        val topics = Array("UpdateStateBykeyWordCount") //消费主题
    
        //基于Direct方式创建DStream
        val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    
        //开始执行WordCount程序
    
        //以空格为切分符切分单词,并转化为 (word,1)形式
        val words = stream.flatMap(_.value().split(" ")).map((_, 1))
        val wordCounts = words.updateStateByKey(
          //每个单词每次batch计算的时候都会调用这个函数
          //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
          //第二个参数为这个key对应的之前的状态
          (values: Seq[Int], state: Option[Int]) => {
    
            var newValue = state.getOrElse(0)
            values.foreach(newValue += _)
            Option(newValue)
    
          })
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }

    5、生产几条数据

    随便写几条即可

    bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic UpdateStateBykeyWordCount 

    6、结果

    根据结果可以看到,历史的单词也被统计打印出来了



  • 相关阅读:
    Android反编译apk并重新打包签名(Mac环境)
    Android Studio修改apk打包生成名称
    Mac搭建SVN服务器+Cornerstone连接服务器
    Android Studio中使用Git进行代码管理(分支、合并)
    Android Studio之SVN打分支、切换分支及合并分支
    一次真实的线上OOM问题定位
    水平分库如何做到平滑扩展
    case when / if else-if 的大坑,要当心!!!
    项目启动时 xml报错:Could not find SQL statement to include with refid 'mbgl.panDuanZbsfkxg'
    只需要返回一条数据,并且必须返回一条数据的时候的写法
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13130548.html
Copyright © 2011-2022 走看看