zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习sparkstreaming监听socket端口

    //手写wordcount java代码

    package com.swust.streaming;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class TestSparkStreaming {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local[2]").setAppName("stream");
            JavaSparkContext jsc = new JavaSparkContext(conf);
    //        jsc.setLogLevel("error");
            JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5000));
    
            //监听端口
            JavaReceiverInputDStream<String> lines = ssc.socketTextStream("data005", 9999);
            // word count
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] splits = s.split(" ");
                    return Arrays.asList(splits).iterator();
                }
            });
            JavaPairDStream<String, Integer> wordRdd = (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    String key = word;
                    int value = 1;
                    Tuple2<String, Integer> tp = new Tuple2<>(key, value);
                    return tp;
                }
            });
            JavaPairDStream<String, Integer> resultRdd = wordRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            resultRdd.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
                @Override
                public void call(JavaPairRDD<String, Integer> pairRDD, Time time) throws Exception {
                    pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                        @Override
                        public void call(Tuple2<String, Integer> tp) throws Exception {
                            System.out.println(tp._1+"-----------------"+tp._2);
                        }
                    });
                }
            });
    
            ssc.start();
            try {
                ssc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ssc.stop(false);
        }
    
    }
    

      

     

     

     

  • 相关阅读:
    loadrunner12-参数化以及参数化关联
    loadrunner--vugen录制脚本提示“无Internet访问。您可能无法录制并执行业务进程”
    loadrunner--web_url函数用法
    loadrunner12-用Chrome如何录制脚本
    LoadRunner--Analysis各项指标详解
    Windows Error Code(windows错误代码详解)
    CentOS 7 (Linux) 下载百度网盘大文件
    博客园cnblogs:自定义页面风格
    Windows Server 2003 添加“Resin”到“服务”出错
    转:mysql分页原理和高效率的mysql分页查询语句
  • 原文地址:https://www.cnblogs.com/walxt/p/12759649.html
Copyright © 2011-2022 走看看