zoukankan      html  css  js  c++  java
  • 最最简单的~WordCount¬

    sc.textFile("hdfs://....").flatMap(line =>line.split(" ")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)
    

    不使用reduceByKey

    sc.textFile("hdfs://....").flatMap(l=>l.split(" ")).map(w=>(w,1)).groupByKey().map((p:(String,Iterable[Int]))=>(p._1,p._2.sum)).collect
    

    步骤1:textFile先生成HadoopRDD,然后再通过map操作生成MappedRDD.

    结果:res0:org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

    步骤2:val split = line =>line.split(" ")).flatMap(line => line.split(" ")) flatMap将原来的MappedRDD转换为FlatMappedRDD

    步骤3:val wordCount = split.map(w =>(w,1)) 利用w生成相应的键值对,上一步的FlatMappedRDD被转换为MappedRDD

    步骤4:val reduce = wordCount.reduceByKey(_+_)

    步骤5:reduce.foreach(println) 触发执行  

     在执行foreach时,调用了runJob函数,实现了重载。 Final RDD和作用于RDD上的Function。 然后读取Finall RDD的分区数,通过allowLocal来表示是否在Standalone模式下执行。

    从spark-shell到sparkContext的创建的调用路径:

    spark-shell -> spark-submit ->spark-class->sparkSubmit.main ->SparkILoop -> createSparkContext

    SpackContext初始化过程中 传入的入参是SparkConf

    一、根据初始化生成SparkConf,再根据SparkConf来创建SparkEnv.

    二、创建TaskScheduler,根据Spark的运行模式选择相应的SchedulerBackend,同时启动TaskScheduler

    private[spark] var taskScheduler = SparkContext.createTaskScheduler(this,master,appName)
    taskScheduler.start()
    

     createTaskScheduler最为关键,根据master环境变量来判断Spark当前的部署方式,从而生成相应的SchedulerBackend的不同子类。taskScheduler.start的目的是启动相应的SchedulerBackend.

    三、从上一步创建的taskScheduler实例为入参创建DAGScheduler并启动运行。

    private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
    dagScheduler.start()
    

    四、启动WebUI.

    ui.start()
    

      

  • 相关阅读:
    vue项目接入百度地图
    angularJS 十六进制与字符串相互转换
    angular项目实现mqtt的订阅与发布 ngx-mqtt
    消息中间件MQTT
    Zigbee 与 WiFi 的区别
    angular6 路由拼接查询参数如 ?id=1 并获取url参数
    SpringBoot拦截器
    SpringBoot定时任务
    SpringBoot 各层之间的关系
    百度离线地图 —— 瓦片地图下载
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/4899486.html
Copyright © 2011-2022 走看看