zoukankan      html  css  js  c++  java
  • sparkStream---1

    1.本地scala版

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    object SparkStreamingDemo {
      def main(args: Array[String]): Unit = {
        //local[n] n > 1
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        //创建Spark流上下文,批次时长是1s
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //创建socket文本流
        val lines = ssc.socketTextStream("localhost", 9999)
        //压扁
        val words = lines.flatMap(_.split(" "))
        //变换成对偶
        val pairs = words.map((_,1));
    
        val count = pairs.reduceByKey(_+_) ;
        count.print()
    
        //启动
        ssc.start()
    
        //等待结束
        ssc.awaitTermination()
      }
    }

     2.java版的,本地

    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Some;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * Created by Administrator on 2017/4/3.
     */
    public class JavaSparkStreamingWordCountApp {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf();
            conf.setAppName("wc");
            conf.setMaster("local[4]");
            //创建Spark流应用上下文
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(3));
    
            jsc.checkpoint("file:///d:/scala/check");
            //创建socket离散流
            JavaReceiverInputDStream sock = jsc.socketTextStream("localhost",9999);
            //压扁
            JavaDStream<String> wordsDS = sock.flatMap(new FlatMapFunction<String,String>() {
                public Iterator call(String str) throws Exception {
                    List<String> list = new ArrayList<String>() ;
                    String[] arr = str.split(" ");
                    for(String s : arr){
                        list.add(s);
                    }
                    return list.iterator();
                }
            });
    
            //映射成元组
            JavaPairDStream<String,Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            }) ;
    
            JavaPairDStream<String,Integer> jps = pairDS.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
                    Integer newCount = v2.isPresent() ? v2.get() : 0  ;
    
                    System.out.println("old value : " + newCount);
                    for(Integer i : v1){
                        System.out.println("new value : " + i);
                        newCount = newCount +  i;
                    }
                    return Optional.of(newCount);
                }
            });
    
    
    
            //聚合
            JavaPairDStream<String,Integer> countDS = jps.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
    
            //打印
            countDS.print();
    
            jsc.start();
    
            jsc.awaitTermination();
    
            jsc.stop();
        }
    }

    3.集群跑。

    将文件打成jar包,放到远程机器中

    spark-submit --name wcstreaming 
                    --class com.spark.java.JavaSparkStreamingWordCountApp 
    //上面是包名加类名
    --master spark://s201:7077 SparkDemo1-1.0-SNAPSHOT.jar

  • 相关阅读:
    软件测试经理工作职责
    测试经理的年终总结
    从0到1,如何快速搭建人才梯队?
    Centos 的常用命令总结
    Docker的常用命令总结
    Jenkins Android项目编译配置(完整版)
    linux CentOS 权限问题修复(chmod 777 -R 或者chmod 755 -R问题修复)
    基于gitlab的项目管理流程
    js 防抖动、重复提交、频繁点击
    GPUImage移植总结
  • 原文地址:https://www.cnblogs.com/kaiwen1/p/8676407.html
Copyright © 2011-2022 走看看