zoukankan      html  css  js  c++  java
  • 55、Spark Streaming:updateStateByKey以及基于缓存的实时wordcount程序

    一、updateStateByKey

    1、概述

    SparkStreaming 7*24 小时不间断的运行,有时需要管理一些状态,比如wordCount,每个batch的数据不是独立的而是需要累加的,这时就需要sparkStreaming来维护一些状态,
    目前有两种方案updateStateByKey&mapWithState,mapWithState是spark1.6新加入的保存状态的方案,官方声称相比updateStateByKey有10倍性能提升。
    
    
    updateStateByKey底层是将preSateRDD和parentRDD进行co-group,然后对所有数据都将经过自定义的mapFun函数进行一次计算,即使当前batch只有一条数据也会进行这么复杂的计算,
    大大的降低了性能,并且计算时间会随着维护的状态的增加而增加。
    
    mapWithstate底层是创建了一个MapWithStateRDD,存的数据是MapWithStateRDDRecord对象,一个Partition对应一个MapWithStateRDDRecord对象,该对象记录了对应Partition所有的
    状态,每次只会对当前batch有的数据进行跟新,而不会像updateStateByKey一样对所有数据计算。
    
    
    
    updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。
    1、首先,要定义一个state,可以是任意的数据类型;
    2、其次,要定义state更新函数——>指定一个函数如何使用之前的state和新值来更新state。
    
    对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,
    那么key对应的state就会被删除。
    
    当然,对于每个新出现的key,也会执行state更新函数。
    
    注意,updateStateByKey操作,要求必须开启Checkpoint机制。
    
    案例:基于缓存的实时wordcount程序(在实际业务场景中,这个是非常有用的)

    2、java案例

    package cn.spark.study.streaming;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import com.google.common.base.Optional;
    
    import scala.Tuple2;
    
    public class UpdateStateByKeyWordCount {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
            
            // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
            // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
            // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
            // 内存数据丢失的时候,可以从checkpoint中恢复数据
            
            // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
            jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint");
            
            // 然后先实现基础的wordcount逻辑
            JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
            
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            });
            
            JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<String, Integer>(word, 1);
                }
            });
            
            // 到了这里,就不一样了,之前的话,是不是直接就是pairs.reduceByKey
            // 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
            // 然后,可以打印出那个时间段的单词计数
            // 但是,有个问题,你如果要统计每个单词的全局的计数呢?
            // 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
            // 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
            
            // 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
            // 这里的Optional,相当于Scala中的样例类,就是Option,可以这么理解
            // 它代表了一个值的存在状态,可能存在,也可能不存在
            JavaPairDStream<String, Integer> wordConts = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    
                private static final long serialVersionUID = 1L;
    
                // 这里两个参数
                // 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数
                // 第一个参数,values,相当于是这个batch中,这个key的新的值,可能有多个吧
                // 比如说一个hello,可能有2个1,(hello, 1) (hello, 1),那么传入的是(1,1)
                // 第二个参数,就是指的是这个key之前的状态,state,其中泛型的类型是你自己指定的
                @Override
                public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
                    // 首先定义一个全局的单词计数
                    Integer newValue = 0;
                    
                    // 其次,判断,state是否存在,如果不存在,说明是一个key第一次出现
                    // 如果存在,说明这个key之前已经统计过全局的次数了
                    if(state.isPresent()) {
                        newValue = state.get();
                    }
                    
                    // 接着,将本次新出现的值,都累加到newValue上去,就是一个key目前的全局的统计次数
                    for(Integer value : values) {
                        newValue += value;
                    }
                    
                    return Optional.of(newValue);
                }
            });
            
            // 到这里为止,相当于是,每个batch过来是,计算到pairs DStream,就会执行全局的updateStateByKey
            // 算子,updateStateByKey返回的JavaPairDStream,其实就代表了每个key的全局的计数
            // 打印出来
            wordConts.print();
            
            jssc.start();
            jssc.awaitTermination();
            jssc.close();
        }
    }
    
    
    
    
    
    ##运行脚本
    [root@spark1 streaming]# cat updateStateByKeyWordCount.sh 
    /usr/local/spark-1.5.1-bin-hadoop2.4/bin/spark-submit 
    --class cn.spark.study.streaming.UpdateStateByKeyWordCount 
    --num-executors 3 
    --driver-memory 100m 
    --executor-memory 100m 
    --executor-cores 3 
    --files /usr/local/hive/conf/hive-site.xml 
    --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar 
    /usr/local/spark-study/java/streaming/saprk-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
    
    
    
    
    ##在另一个窗口打开nc,然后输入内容:
    [root@spark1 kafka]# nc -lk 9999
    hello you
    hello word
    hello you me
    
    
    ##结果,相同的词每次都会全局统计
    (hello,3)
    (word,1)
    (me,1)
    (you,2)

    3、scala案例

    package cn.spark.study.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    
    object UpdateStateByKeyWordCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        ssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint")
        val lines = ssc.socketTextStream("spark1", 9999)
        val words = lines.flatMap( _.split(" "))
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
          var newValue = state.getOrElse(0)
          for(value <- values) {
            newValue += value
          }
          Option(newValue)
        })
        
        wordCounts.print()
        
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    
    
    
    
    ##在另一个窗口打开nc,然后输入内容:
    [root@spark1 kafka]# nc -lk 9999
    hello word
    hello you
    
    
    ##结果,相同的词每次都会全局统计
    (hello,2)
    (word,1)
    (you,1)
  • 相关阅读:
    socket.io带中文时客户端无法响应
    JQ树插件 — zTree笔记
    cecium 笔记
    express处理跨域问题,中间件 CORS
    一些接口
    express 3.5 Err: request aborted
    ovirt kvm嵌套虚拟化
    kvm实现快速增量盘模式的克隆脚本
    kvm命令
    kvm 中 Guest Is already in use 处理办法
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11350718.html
Copyright © 2011-2022 走看看