zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习sparkstreaming监听socket端口

    //手写wordcount java代码

    package com.swust.streaming;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class TestSparkStreaming {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local[2]").setAppName("stream");
            JavaSparkContext jsc = new JavaSparkContext(conf);
    //        jsc.setLogLevel("error");
            JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5000));
    
            //监听端口
            JavaReceiverInputDStream<String> lines = ssc.socketTextStream("data005", 9999);
            // word count
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] splits = s.split(" ");
                    return Arrays.asList(splits).iterator();
                }
            });
            JavaPairDStream<String, Integer> wordRdd = (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    String key = word;
                    int value = 1;
                    Tuple2<String, Integer> tp = new Tuple2<>(key, value);
                    return tp;
                }
            });
            JavaPairDStream<String, Integer> resultRdd = wordRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            resultRdd.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
                @Override
                public void call(JavaPairRDD<String, Integer> pairRDD, Time time) throws Exception {
                    pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                        @Override
                        public void call(Tuple2<String, Integer> tp) throws Exception {
                            System.out.println(tp._1+"-----------------"+tp._2);
                        }
                    });
                }
            });
    
            ssc.start();
            try {
                ssc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ssc.stop(false);
        }
    
    }
    

      

     

     

     

  • 相关阅读:
    rasa learning to rank
    为什么选择rasa
    rasa
    tf.tile
    tf.scatter_nd
    nlp中的数据增强之 google-uda
    递归-分治-动态规划-贪心
    递归算法的美妙
    数据不平衡问题
    WD(西部数据)硬盘,“必须从您要解锁的硬盘对应的WD Drive Unlock CD 运行WD Drive Unlock应用程序”错误解决办法
  • 原文地址:https://www.cnblogs.com/walxt/p/12759649.html
Copyright © 2011-2022 走看看