zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习sparkstreaming监听socket端口

    //手写wordcount java代码

    package com.swust.streaming;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class TestSparkStreaming {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local[2]").setAppName("stream");
            JavaSparkContext jsc = new JavaSparkContext(conf);
    //        jsc.setLogLevel("error");
            JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5000));
    
            //监听端口
            JavaReceiverInputDStream<String> lines = ssc.socketTextStream("data005", 9999);
            // word count
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] splits = s.split(" ");
                    return Arrays.asList(splits).iterator();
                }
            });
            JavaPairDStream<String, Integer> wordRdd = (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    String key = word;
                    int value = 1;
                    Tuple2<String, Integer> tp = new Tuple2<>(key, value);
                    return tp;
                }
            });
            JavaPairDStream<String, Integer> resultRdd = wordRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            resultRdd.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
                @Override
                public void call(JavaPairRDD<String, Integer> pairRDD, Time time) throws Exception {
                    pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                        @Override
                        public void call(Tuple2<String, Integer> tp) throws Exception {
                            System.out.println(tp._1+"-----------------"+tp._2);
                        }
                    });
                }
            });
    
            ssc.start();
            try {
                ssc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ssc.stop(false);
        }
    
    }
    

      

     

     

     

  • 相关阅读:
    提高编程能力的7条建议(转载)
    1.2、Mybatis二级缓存测试
    1.1、Mybatis一级缓存测试
    外键约束
    【转】MyBatis学习总结(四)——解决字段名与实体类属性名不相同的冲突
    【转】MyBatis学习总结(三)——优化MyBatis配置文件中的配置
    【转】The content of element type "configuration" must match "(properties?,settings?,typeAliases?,typeHandlers?,objectFactory?...
    【转】MyBatis学习总结(二)——使用MyBatis对表执行CRUD操作
    【转】MyBatis学习总结(一)——MyBatis快速入门
    java面板
  • 原文地址:https://www.cnblogs.com/walxt/p/12759649.html
Copyright © 2011-2022 走看看