zoukankan      html  css  js  c++  java
  • sparkStream---1

    1.本地scala版

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    object SparkStreamingDemo {
      def main(args: Array[String]): Unit = {
        //local[n] n > 1
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        //创建Spark流上下文,批次时长是1s
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //创建socket文本流
        val lines = ssc.socketTextStream("localhost", 9999)
        //压扁
        val words = lines.flatMap(_.split(" "))
        //变换成对偶
        val pairs = words.map((_,1));
    
        val count = pairs.reduceByKey(_+_) ;
        count.print()
    
        //启动
        ssc.start()
    
        //等待结束
        ssc.awaitTermination()
      }
    }

     2.java版的,本地

    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Some;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * Created by Administrator on 2017/4/3.
     */
    public class JavaSparkStreamingWordCountApp {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf();
            conf.setAppName("wc");
            conf.setMaster("local[4]");
            //创建Spark流应用上下文
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(3));
    
            jsc.checkpoint("file:///d:/scala/check");
            //创建socket离散流
            JavaReceiverInputDStream sock = jsc.socketTextStream("localhost",9999);
            //压扁
            JavaDStream<String> wordsDS = sock.flatMap(new FlatMapFunction<String,String>() {
                public Iterator call(String str) throws Exception {
                    List<String> list = new ArrayList<String>() ;
                    String[] arr = str.split(" ");
                    for(String s : arr){
                        list.add(s);
                    }
                    return list.iterator();
                }
            });
    
            //映射成元组
            JavaPairDStream<String,Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            }) ;
    
            JavaPairDStream<String,Integer> jps = pairDS.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
                    Integer newCount = v2.isPresent() ? v2.get() : 0  ;
    
                    System.out.println("old value : " + newCount);
                    for(Integer i : v1){
                        System.out.println("new value : " + i);
                        newCount = newCount +  i;
                    }
                    return Optional.of(newCount);
                }
            });
    
    
    
            //聚合
            JavaPairDStream<String,Integer> countDS = jps.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
    
            //打印
            countDS.print();
    
            jsc.start();
    
            jsc.awaitTermination();
    
            jsc.stop();
        }
    }

    3.集群跑。

    将文件打成jar包,放到远程机器中

    spark-submit --name wcstreaming 
                    --class com.spark.java.JavaSparkStreamingWordCountApp 
    //上面是包名加类名
    --master spark://s201:7077 SparkDemo1-1.0-SNAPSHOT.jar

  • 相关阅读:
    JAVA基础-抽象类和接口
    JAVA基础-多态
    JAVA基础-继承机制
    C++(二十七) — 深拷贝、浅拷贝、复制构造函数举例
    C++(二十六) — 构造函数、析构函数、对象数组、复制构造函数
    C++(二十五) — 类的封装、实现、设计
    C++(二十四) — 指向字符的指针为什么可以用字符串来初始化,而不是字符地址?
    C++(二十三) — 内存泄漏及指针悬挂
    C++(二十二) — 指针变量、函数指针、void指针
    C++(二十一) — 引用概念及本质
  • 原文地址:https://www.cnblogs.com/kaiwen1/p/8676407.html
Copyright © 2011-2022 走看看