package iie.udps.example.operator.spark; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.Time; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; import com.google.common.io.Files; import org.apache.spark.api.java.JavaPairRDD; import com.google.common.base.Optional; /** * To run this on your local machine, you need to first run a Netcat server * * `$ nc -lk 9999` * * and run the example as * * spark-submit --class iie.udps.example.operator.spark.JavaNetworkWordCount * --master local /home/xdf/test2.jar localhost 9999 /user/test/checkpoint/ * /home/xdf/outputFile /home/xdf/totalOutputFile * * 此示例接收Netcat server产生的数据,进行WordCount操作,分别输出当前结果和历史结果到本地文件中 */ public final class JavaNetworkWordCount { @SuppressWarnings("serial") public static void main(String[] args) { if (args.length != 5) { System.err.println("You arguments were " + Arrays.asList(args)); System.err .println("Usage: JavaNetworkWordCount <hostname> <port> <checkpoint-directory> " + " <output-file> <total-output-file>. <hostname> and <port> describe the TCP server that Spark " + " Streaming would connect to receive data. <checkpoint-directory> directory to " + " HDFS-compatible file system which checkpoint data <output-file> file to which " + " the word counts will be appended " + " <total-output-file> file to which the total word counts will be appended " + " " + "In local mode, <master> should be 'local[n]' with n > 1 " + "Both <checkpoint-directory> and <output-file> and <total-output-file> must be absolute paths"); System.exit(1); } final String checkpointDirectory = args[2]; // 检查点目录 final String curOutputPath = args[3];// 输出当前WordCount结果的路径 final String totalOutputPath = args[4];// 输出全部累计WordCount结果的路径 System.out.println("Creating new context"); final File curOutputFile = new File(curOutputPath); if (curOutputFile.exists()) { curOutputFile.delete(); } final File totalOutputFile = new File(totalOutputPath); if (totalOutputFile.exists()) { totalOutputFile.delete(); } // Create a StreamingContext SparkConf conf = new SparkConf().setAppName("NetworkWordCount"); final JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000)); jssc.checkpoint(checkpointDirectory); // Create a DStream that will connect to hostname:port, like // localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream(args[0], Integer.parseInt(args[1])); // Split each line into words JavaDStream<String> words = lines .flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words .mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> runningCounts = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); runningCounts .foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException { String counts = "Counts at time " + time + " " + rdd.collect(); System.out.println(counts); System.out.println("Appending to " + curOutputFile.getAbsolutePath()); Files.append(counts + " ", curOutputFile, Charset.defaultCharset()); return null; } }); Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = state.or(0); for (Integer i : values) { newSum += i; } return Optional.of(newSum); } }; JavaPairDStream<String, Integer> TotalCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).updateStateByKey(updateFunction); TotalCounts .foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException { String counts = "Counts at time " + time + " " + rdd.collect(); System.out.println(counts); System.out.println("Appending to " + totalOutputFile.getAbsolutePath()); Files.append(counts + " ", totalOutputFile, Charset.defaultCharset()); return null; } }); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate System.exit(0); } }