zoukankan      html  css  js  c++  java
  • spark streaming 实现接收网络传输数据进行WordCount功能

    package iie.udps.example.operator.spark;
    
    import scala.Tuple2;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.Time;
    
    import java.io.File;
    import java.io.IOException;
    import java.nio.charset.Charset;
    import java.util.Arrays;
    import java.util.List;
    
    import com.google.common.io.Files;
    
    import org.apache.spark.api.java.JavaPairRDD;
    
    import com.google.common.base.Optional;
    
    /**
     * To run this on your local machine, you need to first run a Netcat server
     * 
     * `$ nc -lk 9999`
     * 
     * and run the example as
     * 
     * spark-submit --class iie.udps.example.operator.spark.JavaNetworkWordCount
     * --master local /home/xdf/test2.jar localhost 9999 /user/test/checkpoint/
     * /home/xdf/outputFile /home/xdf/totalOutputFile
     * 
     * 此示例接收Netcat server产生的数据,进行WordCount操作,分别输出当前结果和历史结果到本地文件中
     */
    public final class JavaNetworkWordCount {
    
    	@SuppressWarnings("serial")
    	public static void main(String[] args) {
    
    		if (args.length != 5) {
    			System.err.println("You arguments were " + Arrays.asList(args));
    			System.err
    					.println("Usage: JavaNetworkWordCount <hostname> <port> <checkpoint-directory>
    "
    							+ "     <output-file> <total-output-file>. <hostname> and <port> describe the TCP server that Spark
    "
    							+ "     Streaming would connect to receive data. <checkpoint-directory> directory to
    "
    							+ "     HDFS-compatible file system which checkpoint data <output-file> file to which
    "
    							+ "     the word counts will be appended
    "
    							+ "     <total-output-file> file to which the total word counts will be appended
    "
    							+ "
    "
    							+ "In local mode, <master> should be 'local[n]' with n > 1
    "
    							+ "Both <checkpoint-directory> and <output-file> and <total-output-file> must be absolute paths");
    			System.exit(1);
    		}
    
    		final String checkpointDirectory = args[2]; // 检查点目录
    		final String curOutputPath = args[3];// 输出当前WordCount结果的路径
    		final String totalOutputPath = args[4];// 输出全部累计WordCount结果的路径
    		System.out.println("Creating new context");
    		final File curOutputFile = new File(curOutputPath);
    		if (curOutputFile.exists()) {
    			curOutputFile.delete();
    		}
    		final File totalOutputFile = new File(totalOutputPath);
    		if (totalOutputFile.exists()) {
    			totalOutputFile.delete();
    		}
    		// Create a StreamingContext
    		SparkConf conf = new SparkConf().setAppName("NetworkWordCount");
    		final JavaStreamingContext jssc = new JavaStreamingContext(conf,
    				new Duration(1000));
    
    		jssc.checkpoint(checkpointDirectory);
    
    		// Create a DStream that will connect to hostname:port, like
    		// localhost:9999
    		JavaReceiverInputDStream<String> lines = jssc.socketTextStream(args[0],
    				Integer.parseInt(args[1]));
    
    		// Split each line into words
    		JavaDStream<String> words = lines
    				.flatMap(new FlatMapFunction<String, String>() {
    					@Override
    					public Iterable<String> call(String x) {
    						return Arrays.asList(x.split(" "));
    					}
    				});
    
    		// Count each word in each batch
    		JavaPairDStream<String, Integer> pairs = words
    				.mapToPair(new PairFunction<String, String, Integer>() {
    					@Override
    					public Tuple2<String, Integer> call(String s)
    							throws Exception {
    						return new Tuple2<String, Integer>(s, 1);
    					}
    				});
    		JavaPairDStream<String, Integer> runningCounts = pairs
    				.reduceByKey(new Function2<Integer, Integer, Integer>() {
    					@Override
    					public Integer call(Integer i1, Integer i2)
    							throws Exception {
    						return i1 + i2;
    					}
    				});
    
    		runningCounts
    				.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    					@Override
    					public Void call(JavaPairRDD<String, Integer> rdd, Time time)
    							throws IOException {
    						String counts = "Counts at time " + time + " "
    								+ rdd.collect();
    						System.out.println(counts);
    						System.out.println("Appending to "
    								+ curOutputFile.getAbsolutePath());
    						Files.append(counts + "
    ", curOutputFile,
    								Charset.defaultCharset());
    						return null;
    					}
    				});
    
    		Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    			@Override
    			public Optional<Integer> call(List<Integer> values,
    					Optional<Integer> state) {
    				Integer newSum = state.or(0);
    				for (Integer i : values) {
    					newSum += i;
    				}
    				return Optional.of(newSum);
    			}
    		};
    
    		JavaPairDStream<String, Integer> TotalCounts = words.mapToPair(
    				new PairFunction<String, String, Integer>() {
    					@Override
    					public Tuple2<String, Integer> call(String s) {
    						return new Tuple2<String, Integer>(s, 1);
    					}
    				}).updateStateByKey(updateFunction);
    
    		TotalCounts
    				.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    					@Override
    					public Void call(JavaPairRDD<String, Integer> rdd, Time time)
    							throws IOException {
    						String counts = "Counts at time " + time + " "
    								+ rdd.collect();
    						System.out.println(counts);
    						System.out.println("Appending to "
    								+ totalOutputFile.getAbsolutePath());
    						Files.append(counts + "
    ", totalOutputFile,
    								Charset.defaultCharset());
    						return null;
    					}
    				});
    
    		jssc.start(); // Start the computation
    		jssc.awaitTermination(); // Wait for the computation to terminate
    		System.exit(0);
    	}
    
    }
    

      

  • 相关阅读:
    统计学习方法学习笔记第二章(感知机)
    filebeat句柄占用问题
    小组年终总结的汇总
    使用docker制作zookeeper镜像
    普罗米修斯在k8s上面的部署
    k8s亲和性和反亲和性的理解
    AlertManager 的在k8s集群上面的安装部署使用
    filebeat生产上面镜像制作的流程
    普罗米修斯生产上面的性能优化点
    AlertManger集群的搭建
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5027179.html
Copyright © 2011-2022 走看看