zoukankan      html  css  js  c++  java
  • spark streaming

    discretized
    离散化的
    
    http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#overview
    

     

    [root@node5 ~]# yum install nc
    
    [root@node5 ~]# nc -lk 9999
    
    
    
    
    
    
    package com.bjsxt.spark;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import scala.Tuple2;
    
    public class SparkStreamingTest {
    	public static void main(String[] args) {
    		// local[1] 时只有一个task,被占用为接收数据,打印输出没有任务线程执行
    		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingTest");
    		
    		final JavaSparkContext sc = new JavaSparkContext(conf);
    		
    		sc.setLogLevel("WARN");
    		
    		JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(5));
    		
    		JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("node5",9999);
    		
    		JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String,String>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterable<String> call(String lines) throws Exception {
    				return Arrays.asList(lines.split(" "));
    			}
    		});
    		
    		
    		JavaPairDStream<String,Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Tuple2<String, Integer> call(String word) throws Exception {
    				return new Tuple2<String, Integer>(word,1);
    			}
    		});
    		JavaPairDStream<String, Integer> reduceByKey = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() {
    			
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Integer call(Integer v1, Integer v2) throws Exception {
    				System.out.println("rdd reduceByKey************************");
    				return v1 + v2;
    			}
    		});
    		
    		
    //		reduceByKey.print(1000);
    		/**
    		 * foreachRDD 可以拿到DStream中的RDD,对拿到的RDD可以使用RDD的transformation类算子转换,要对拿到的RDD使用action算子触发执行,否则,foreachRDD也不会执行。
    		 * foreachRDD中call方法内,拿到的RDD的算子外,代码是在Driver端执行;可以使用这个算子动态改变广播变量(使用配置文件的方式)
    		 */
    		reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public void call(JavaPairRDD<String, Integer> rdd)
    					throws Exception {
    				// Driver中获取SparkContext的正确方式; 中获取广播变量的正确方式。
    				SparkContext context = rdd.context();
    				JavaSparkContext sc = new JavaSparkContext(context);
    				Broadcast<String> broadcast = sc.broadcast("hello");
    				String value = broadcast.value();
    				
    				System.out.println("Driver.........");
    				JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {
    
    					/**
    					 * 
    					 */
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Tuple2<String, Integer> call(
    							Tuple2<String, Integer> tuple) throws Exception {
    						System.out.println("Executor.........");
    						
    						return new Tuple2<String,Integer>(tuple._1,tuple._2);
    					}
    				});
    				
    				mapToPair.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    
    					/**
    					 * 
    					 */
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public void call(Tuple2<String, Integer> arg0)
    							throws Exception {
    						System.out.println(arg0);
    					}
    				});
    			}
    		});
    		jsc.start();
    		jsc.awaitTermination();
    		jsc.stop();
    		
    		
    	}
    }
    
    
    
    [root@node5 ~]# nc -lk 9999   ## linux 发送socket数据。
    hello sxt 
    hello bj
    hello
    hello
    zhongguo
    zhngguo
    zhongguo
    
     * foreachRDD能够动态获取广播变量,
    *    能顾获取到JavaSparkContext,说明是在Driver端执行
    
    checkpoint 在内存中也存在一份,磁盘中保存一份。
    
    package com.bjsxt.sparkstreaming;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import com.google.common.base.Optional;
    
    import scala.Tuple2;
    
    /**
     * UpdateStateByKey的主要功能:
     * 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
     * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新
     * 
     * hello,3
     * bjsxt,2
     * 
     * 如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,这个时候就需要开启checkpoint机制和功能 
     * 
     * 全面的广告点击分析
     * @author root
     *
     * 有何用?   统计广告点击流量,统计这一天的车流量,统计点击量
     */
    
    public class UpdateStateByKeyOperator {
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyDemo");
    		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
    		/**
    		 * 设置checkpoint目录
    		 * 
    		 * 多久会将内存中的数据(每一个key所对应的状态)写入到磁盘上一份呢?
    		 * 	如果你的batchInterval小于10s  那么10s会将内存中的数据写入到磁盘一份
    		 * 	如果bacthInterval 大于10s,那么就以bacthInterval为准
    		 * 
    		 * 这样做是为了防止频繁的写HDFS
    		 */
    		
    //		JavaSparkContext sc = jsc.sparkContext();
    //		sc.setCheckpointDir("./checkpoint");
    // 		jsc.checkpoint("hdfs://node1:9000/spark/checkpoint");
     		jsc.checkpoint("./checkpoint");
     		
    		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999);
    
    		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterable<String> call(String s) {
    				return Arrays.asList(s.split(" "));
    			}
    		});
    
    		JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Tuple2<String, Integer> call(String s) {
    				return new Tuple2<String, Integer>(s, 1);
    			}
    		});
    
    		JavaPairDStream<String, Integer> counts = 
    				ones.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
    				/**
    				 * values:经过分组最后 这个key所对应的value  [1,1,1,1,1]
    				 * state:这个key在本次之前之前的状态
    				 */
    				Integer updateValue = 0 ;
    				 if(state.isPresent()){
    					 updateValue = state.get();
    				 }
    				 
    				for(Integer value : values) {
    					 updateValue += value;
    				}
    				return Optional.of(updateValue);
    			}
    		});
    		
    		//output operator
     		counts.print();
     		
     		jsc.start();
     		jsc.awaitTermination();
     		jsc.close();
    	}
    }
    
    
    
    
    // 窗口长度和滑动间隔必须是批次的整数倍
    
    
    package com.bjsxt.sparkstreaming;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import scala.Tuple2;
    
    /**
     * 基于滑动窗口的热点搜索词实时统计
     * @author root
     *
     */
    public class WindowOperator {
    	
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf()
    				.setMaster("local[2]")
    				.setAppName("WindowHotWord"); 
    		
    		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
    		/**
    		 * 设置日志级别为WARN
    		 *
    		 */
    		jssc.sparkContext().setLogLevel("WARN");
    		/**
    		 * 注意:
    		 *  没有优化的窗口函数可以不设置checkpoint目录
    		 *  优化的窗口函数必须设置checkpoint目录		 
    		 */
    //   		jssc.checkpoint("hdfs://node1:9000/spark/checkpoint");
       		jssc.checkpoint("./checkpoint"); 
    		JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node5", 9999);
    //		JavaDStream<String> window = searchLogsDStream.window(Durations.seconds(15), Durations.seconds(5));
    		//word	1
    		JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterable<String> call(String t) throws Exception {
    				return Arrays.asList(t.split(" "));
    			}
    		});
    		
    		// 将搜索词映射为(searchWord, 1)的tuple格式
    		JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair(
    				
    				new PairFunction<String, String, Integer>() {
    
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Tuple2<String, Integer> call(String searchWord)
    							throws Exception {
    						return new Tuple2<String, Integer>(searchWord, 1);
    					}
    					
    				});
    		/**
    		 * 每隔10秒,计算最近60秒内的数据,那么这个窗口大小就是60秒,里面有12个rdd,在没有计算之前,这些rdd是不会进行计算的。
    		 * 那么在计算的时候会将这12个rdd聚合起来,然后一起执行reduceByKeyAndWindow操作 ,
    		 * reduceByKeyAndWindow是针对窗口操作的而不是针对DStream操作的。
    		 */
    //	   	 JavaPairDStream<String, Integer> searchWordCountsDStream = 
    //				
    //				searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
    //
    //					private static final long serialVersionUID = 1L;
    //
    //					@Override
    //					public Integer call(Integer v1, Integer v2) throws Exception {
    //						return v1 + v2;
    //					}
    //		}, Durations.seconds(15), Durations.seconds(5)); 
    		
    		
    		/**
    		 * window窗口操作优化:
    		 */
      	   JavaPairDStream<String, Integer> searchWordCountsDStream = 
    		
    		 searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
    
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Integer call(Integer v1, Integer v2) throws Exception {
    				return v1 + v2;
    			}
    			
    		},new Function2<Integer, Integer, Integer>() {
    
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Integer call(Integer v1, Integer v2) throws Exception {
    				return v1 - v2;
    			}
    			
    		}, Durations.seconds(15), Durations.seconds(5));    
    
    	  	searchWordCountsDStream.print();
    		
    		jssc.start(); 	
    		jssc.awaitTermination();
    		jssc.close();
    	}
    
    }
    
    
    
    // 设置本地运维模式启动两个线程运行程序
    SparkConf conf = new SparkConf()
    				.setMaster("local[2]")
    				.setAppName("WindowHotWord"); 
    

     

    普通机制不需要设置checkpoint; 窗口滑动的优化机制,必须设置checkpoint; 相当于减去出去的批次,增加新进入的批次。
    

     

    package com.bjsxt.sparkstreaming;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import scala.Tuple2;
    /**
     * 过滤黑名单
     * transform操作
     * DStream可以通过transform做RDD到RDD的任意操作。
     * @author root
     *
     */
    public class TransformOperator {
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf();
    		conf.setMaster("local[2]").setAppName("transform");
    		JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
    		
    		//黑名单
    		List<String> list = Arrays.asList("zhangsan");
    		final Broadcast<List<String>> bcBlackList = jsc.sparkContext().broadcast(list);
    		
    		//接受socket数据源
    		JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999);
    		JavaPairDStream<String, String> pairNameList = 
    				nameList.mapToPair(new PairFunction<String, String, String>() {
    
    			/**
    			 *
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Tuple2<String, String> call(String s) throws Exception {
    				return new Tuple2<String, String>(s.split(" ")[1], s);
    			}
    		});
    		/**
    		 * transform 可以拿到DStream中的RDD,做RDD到RDD之间的转换,不需要Action算子触发,需要返回RDD类型。
    		 * 注意:transform call方法内,拿到RDD 算子外的代码 在Driver端执行,也可以做到动态改变广播变量。
    		 */
    		JavaDStream<String> transFormResult =
    				pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD)
    					throws Exception {
    				
    				 JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String,String>, Boolean>() {
    
    					/**
    					 * 
    					 */
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Boolean call(Tuple2<String, String> tuple) throws Exception {
    						return !bcBlackList.value().contains(tuple._1);
    					}
    				});
    				
    				JavaRDD<String> map = filter.map(new Function<Tuple2<String,String>, String>() {
    
    					/**
    					 * 
    					 */
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public String call(Tuple2<String, String> tuple) throws Exception {
    						return tuple._2;
    					}
    				});
    				//返回过滤好的结果
    				return map;
    			}
    		});
    		
    		transFormResult.print();
    		
    		jsc.start();
    		jsc.awaitTermination();
    		jsc.stop();
    	}
    }
    

      

    监控socket需要2个task,因为spark streaming 底层监控socket需要有一个task一致receive.另外一个负责运行程序。setMaster("local[2]")
    监控文件夹则可以不需要两个task,一个task也可。setMaster("local")
    
    spark streaming 监控文件夹。
    package streamingOperate.transformations;
    
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    /**
     * 通过func函数过滤返回为true的记录,返回一个新的Dstream
     * @author root
     *
     */
    public class Operate_filter {
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Operate_filter");
    		JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
    		JavaDStream<String> textFileStream = jsc.textFileStream("data");
    		/**
    		 * 下面的例子是每次读入一行数据,通过观察文件,看到第一行为“crosses the repetition to duplicate daily,”开头
    		 * 所以下面将过滤只显示以“crosses the repetition to duplicate daily,”开头的行
    		 */
    		textFileStream.filter(new Function<String,Boolean>(){
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			public Boolean call(String line) throws Exception {
    				return line.startsWith("a 100");
    			}
    			
    		}).print(1000);
    		
    		jsc.start();
    		jsc.awaitTermination();
    		jsc.close();
    	}
    }
    
    
    // 不能监控手动修改的文件,需要保证文件操作的原子性。
    package streamingOperate.util;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.util.UUID;
    /**
     * 此复制文件的程序是模拟在data目录下动态生成相同格式的txt文件,用于给sparkstreaming 中 textFileStream提供输入流。
     * @author root
     *
     */
    public class CopyFile_data1 {
    	public static void main(String[] args) throws IOException, InterruptedException {
    		while(true){
    			Thread.sleep(5000);
    			String uuid = UUID.randomUUID().toString();
    			System.out.println(uuid);
    			copyFile(new File("data1.txt"),new File(".\data\"+uuid+"----data1.txt"));
    		}
    	}
    
    	public static void copyFile(File fromFile, File toFile) throws IOException {
    		FileInputStream ins = new FileInputStream(fromFile);
    		FileOutputStream out = new FileOutputStream(toFile);
    		byte[] b = new byte[1024];
    		@SuppressWarnings("unused")
    		int n = 0;
    		while ((n = ins.read(b)) != -1) {
    			out.write(b, 0, b.length);
    		}
    
    		ins.close();
    		out.close();
    	}
    }
    
    
    
    
    package streamingOperate.output;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.dstream.DStream;
    
    /**
     * saveAsTextFiles(prefix, [suffix]):
     * 将此DStream的内容另存为文本文件。每批次数据产生的文件名称格式基于:prefix和suffix: "prefix-TIME_IN_MS[.suffix]".
     * 
     * 注意:
     * saveAsTextFile是调用saveAsHadoopFile实现的
     * spark中普通rdd可以直接只用saveAsTextFile(path)的方式,保存到本地,但是此时DStream的只有saveAsTextFiles()方法,没有传入路径的方法,
     * 其参数只有prefix, suffix
     * 其实:DStream中的saveAsTextFiles方法中又调用了rdd中的saveAsTextFile方法,我们需要将path包含在prefix中
     *
     */
    public class Operate_saveAsTextFiles {
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf().setMaster("local").setAppName("Operate_saveAsTextFiles");
    		JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
    		JavaDStream<String> textFileStream = jsc.textFileStream("./data");
    		
    		JavaDStream<String> flatMap = textFileStream.flatMap(new FlatMapFunction<String, String>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			public Iterable<String> call(String t) throws Exception {
    				
    				return Arrays.asList(t.split(" "));
    			}
    		});
    		//保存在当前路径中savedata路径下,以prefix开头,以suffix结尾的文件。
    		DStream<String> dstream = flatMap.dstream();
    //		dstream.saveAsTextFiles(".\savedata\prefix", "suffix");
    		dstream.saveAsTextFiles(".\savedata\spark\mydate", "aaaa");
    		
    		jsc.start();
    		jsc.awaitTermination();
    		jsc.close();
    	}
    }
    

      

     

      

      

      

     

  • 相关阅读:
    python--列表,元组,字符串互相转换
    10月清北学堂培训 Day 2
    10月清北学堂培训 Day 1
    网络流小结
    P1850 换教室
    P1948 [USACO08JAN]电话线Telephone Lines
    P3147 [USACO16OPEN]262144
    8月清北学堂培训 Day 7
    8月清北学堂培训 Day3
    8月清北学堂培训 Day1
  • 原文地址:https://www.cnblogs.com/xhzd/p/11595288.html
Copyright © 2011-2022 走看看