zoukankan      html  css  js  c++  java
  • sparkStream---1

    1.本地scala版

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    object SparkStreamingDemo {
      def main(args: Array[String]): Unit = {
        //local[n] n > 1
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        //创建Spark流上下文,批次时长是1s
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //创建socket文本流
        val lines = ssc.socketTextStream("localhost", 9999)
        //压扁
        val words = lines.flatMap(_.split(" "))
        //变换成对偶
        val pairs = words.map((_,1));
    
        val count = pairs.reduceByKey(_+_) ;
        count.print()
    
        //启动
        ssc.start()
    
        //等待结束
        ssc.awaitTermination()
      }
    }

     2.java版的,本地

    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Some;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * Created by Administrator on 2017/4/3.
     */
    public class JavaSparkStreamingWordCountApp {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf();
            conf.setAppName("wc");
            conf.setMaster("local[4]");
            //创建Spark流应用上下文
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(3));
    
            jsc.checkpoint("file:///d:/scala/check");
            //创建socket离散流
            JavaReceiverInputDStream sock = jsc.socketTextStream("localhost",9999);
            //压扁
            JavaDStream<String> wordsDS = sock.flatMap(new FlatMapFunction<String,String>() {
                public Iterator call(String str) throws Exception {
                    List<String> list = new ArrayList<String>() ;
                    String[] arr = str.split(" ");
                    for(String s : arr){
                        list.add(s);
                    }
                    return list.iterator();
                }
            });
    
            //映射成元组
            JavaPairDStream<String,Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            }) ;
    
            JavaPairDStream<String,Integer> jps = pairDS.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
                    Integer newCount = v2.isPresent() ? v2.get() : 0  ;
    
                    System.out.println("old value : " + newCount);
                    for(Integer i : v1){
                        System.out.println("new value : " + i);
                        newCount = newCount +  i;
                    }
                    return Optional.of(newCount);
                }
            });
    
    
    
            //聚合
            JavaPairDStream<String,Integer> countDS = jps.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
    
            //打印
            countDS.print();
    
            jsc.start();
    
            jsc.awaitTermination();
    
            jsc.stop();
        }
    }

    3.集群跑。

    将文件打成jar包,放到远程机器中

    spark-submit --name wcstreaming 
                    --class com.spark.java.JavaSparkStreamingWordCountApp 
    //上面是包名加类名
    --master spark://s201:7077 SparkDemo1-1.0-SNAPSHOT.jar

  • 相关阅读:
    IOS Charles(代理服务器软件,可以用来拦截网络请求)
    Javascript中addEventListener和attachEvent的区别
    MVC中实现Area几种方法
    Entity Framework Code First 中使用 Fluent API 笔记。
    自定义JsonResult解决 序列化类型 System.Data.Entity.DynamicProxies 的对象时检测到循环引用
    序列化类型 System.Data.Entity.DynamicProxies 的对象时检测到循环引用
    An entity object cannot be referenced by multiple instances of IEntityChangeTracker 的解决方案
    Code First :使用Entity. Framework编程(8) ----转发 收藏
    Code First :使用Entity. Framework编程(6) ----转发 收藏
    Code First :使用Entity. Framework编程(5) ----转发 收藏
  • 原文地址:https://www.cnblogs.com/kaiwen1/p/8676407.html
Copyright © 2011-2022 走看看