zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习sparkstreaming监听socket端口

    //手写wordcount java代码

    package com.swust.streaming;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class TestSparkStreaming {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local[2]").setAppName("stream");
            JavaSparkContext jsc = new JavaSparkContext(conf);
    //        jsc.setLogLevel("error");
            JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5000));
    
            //监听端口
            JavaReceiverInputDStream<String> lines = ssc.socketTextStream("data005", 9999);
            // word count
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] splits = s.split(" ");
                    return Arrays.asList(splits).iterator();
                }
            });
            JavaPairDStream<String, Integer> wordRdd = (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    String key = word;
                    int value = 1;
                    Tuple2<String, Integer> tp = new Tuple2<>(key, value);
                    return tp;
                }
            });
            JavaPairDStream<String, Integer> resultRdd = wordRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            resultRdd.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
                @Override
                public void call(JavaPairRDD<String, Integer> pairRDD, Time time) throws Exception {
                    pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                        @Override
                        public void call(Tuple2<String, Integer> tp) throws Exception {
                            System.out.println(tp._1+"-----------------"+tp._2);
                        }
                    });
                }
            });
    
            ssc.start();
            try {
                ssc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ssc.stop(false);
        }
    
    }
    

      

     

     

     

  • 相关阅读:
    自定义的tabBarController的几种方法
    JAVA如何把一个float四舍五入到小数点后2位,4位,或者其它指定位数.
    ALAssetsLibrary使用
    UITabBarController详解
    学习笔记:Tab Bar 控件使用详解
    iOS开发 跳转场景的三种方式
    Java中文件与字节数组转换
    'NSUnknownKeyException', reason:....etValue:forUndefinedKey:]: this class is not key value coding-compliant for the key
    开源项目
    object-c的异常处理机制
  • 原文地址:https://www.cnblogs.com/walxt/p/12759649.html
Copyright © 2011-2022 走看看