zoukankan      html  css  js  c++  java
  • sparkStream---1

    1.本地scala版

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    object SparkStreamingDemo {
      def main(args: Array[String]): Unit = {
        //local[n] n > 1
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        //创建Spark流上下文,批次时长是1s
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //创建socket文本流
        val lines = ssc.socketTextStream("localhost", 9999)
        //压扁
        val words = lines.flatMap(_.split(" "))
        //变换成对偶
        val pairs = words.map((_,1));
    
        val count = pairs.reduceByKey(_+_) ;
        count.print()
    
        //启动
        ssc.start()
    
        //等待结束
        ssc.awaitTermination()
      }
    }

     2.java版的,本地

    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Some;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * Created by Administrator on 2017/4/3.
     */
    public class JavaSparkStreamingWordCountApp {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf();
            conf.setAppName("wc");
            conf.setMaster("local[4]");
            //创建Spark流应用上下文
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(3));
    
            jsc.checkpoint("file:///d:/scala/check");
            //创建socket离散流
            JavaReceiverInputDStream sock = jsc.socketTextStream("localhost",9999);
            //压扁
            JavaDStream<String> wordsDS = sock.flatMap(new FlatMapFunction<String,String>() {
                public Iterator call(String str) throws Exception {
                    List<String> list = new ArrayList<String>() ;
                    String[] arr = str.split(" ");
                    for(String s : arr){
                        list.add(s);
                    }
                    return list.iterator();
                }
            });
    
            //映射成元组
            JavaPairDStream<String,Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            }) ;
    
            JavaPairDStream<String,Integer> jps = pairDS.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
                    Integer newCount = v2.isPresent() ? v2.get() : 0  ;
    
                    System.out.println("old value : " + newCount);
                    for(Integer i : v1){
                        System.out.println("new value : " + i);
                        newCount = newCount +  i;
                    }
                    return Optional.of(newCount);
                }
            });
    
    
    
            //聚合
            JavaPairDStream<String,Integer> countDS = jps.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
    
            //打印
            countDS.print();
    
            jsc.start();
    
            jsc.awaitTermination();
    
            jsc.stop();
        }
    }

    3.集群跑。

    将文件打成jar包,放到远程机器中

    spark-submit --name wcstreaming 
                    --class com.spark.java.JavaSparkStreamingWordCountApp 
    //上面是包名加类名
    --master spark://s201:7077 SparkDemo1-1.0-SNAPSHOT.jar

  • 相关阅读:
    OpenLayer 3 鹰眼控件和全屏显示
    OpenLayer 3 鼠标位置坐标显示控件
    Openlayer 3 图层列表控件(自定义)
    OpenLayers 3 的地图基本操作
    小米范工具系列之十四:小米范网站批量爬虫工具
    ruby所有版本下载地址
    常用代码块:使用时间生成数据库文件名
    收集些常用的正则--以后慢慢添加
    小米范工具系列最新下载地址
    小米范工具系列之十三:小米范验证码登录爆破工具
  • 原文地址:https://www.cnblogs.com/kaiwen1/p/8676407.html
Copyright © 2011-2022 走看看