zoukankan      html  css  js  c++  java
  • spark08

    spark08

    这就是广播变量,每个executor中复用一份数据,在driver端将数据广播出去,在executor端使用

    val bd = sc.broadcast(iparr)
    val proRDD = accRDD.map(t=>{
      val province = binarySeach(t,bd.value)
      (province,1)
    })

    广播变量的内部机制

    比特洪流技术(快播,迅雷),共享资源

     

     

    广播变量实现类就是torrent实现类,就是之前用的种子,存放的数据交给每个blockManager组件进行管理,广播变量不能广播rdd,一般广播的都是小的带有规则信息的文件

    spark集群中的序列化

    object SeriaTest {
      /**
        * 1.是不是每条数据单独使用一个变量
        * 2.是不是每个task任务单独使用一份
        * 3.是不是每个executor单独使用一份+
        */


      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
        val rdd1 = rdd.map(t=>{
             //是不是每条数据使用一个对象
             val executor = InetAddress.getLocalHost.getHostName//ip地址就是executor
             val task = Thread.currentThread().getName//当前线程的名字
          val s = new Student
          (executor,task,s)
        })
        rdd1.saveAsTextFile("hdfs://master:9000/srresult001")
      }
    }


    object SeriaTest1 {
      /**
        * 1.是不是每条数据单独使用一个变量
        * 2.是不是每个task任务单独使用一份
        * 3.是不是每个executor单独使用一份
        */


      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
        val s = new Student
        val rdd1 = rdd.map(t=>{
          //是不是每条数据使用一个对象
          val executor = InetAddress.getLocalHost.getHostName//ip地址就是executor
          val task = Thread.currentThread().getName//当前线程的名字
          (executor,task,s)
        })
        rdd1.saveAsTextFile("hdfs://master:9000/srresult002")
      }
    }

    object SeriaTest2 {
      /**
        * 1.是不是每条数据单独使用一个变量
        * 2.是不是每个task任务单独使用一份
        * 3.是不是每个executor单独使用一份
        */


      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
        val s = new Student
       val bd =  sc.broadcast(s)
        val rdd1 = rdd.map(t=>{
          //是不是每条数据使用一个对象
          val executor = InetAddress.getLocalHost.getHostName//ip地址就是executor
          val task = Thread.currentThread().getName//当前线程的名字
          (executor,task,bd.value)
        })
        rdd1.saveAsTextFile("hdfs://master:9000/srresult003")
      }
    }
    class Student extends Serializable

     

    在算子中创建对象,一条数据创建一个对象

     

    在闭包引用的时候一个task任务用一份数据

    [root@master Downloads]# hdfs dfs -cat /srresult003/part-00003

    (master02,Executor task launch worker for task 3,com.bw.spark.Student@198c75f4)

    (master02,Executor task launch worker for task 3,com.bw.spark.Student@198c75f4)

    ^[[A[root@master Downloads]# hdfs dfs -cat /srresult003/part-00001

    (master01,Executor task launch worker for task 1,com.bw.spark.Student@6df927c2)

    (master01,Executor task launch worker for task 1,com.bw.spark.Student@6df927c2)

    ^[[A[root@master Downloads]# hdfs dfs -cat /srresult003/part-00002

    (master,Executor task launch worker for task 2,com.bw.spark.Student@3adc2bae)

    (master,Executor task launch worker for task 2,com.bw.spark.Student@3adc2bae)

    [root@master Downloads]# hdfs dfs -cat /srresult003/part-00000

    (master02,Executor task launch worker for task 0,com.bw.spark.Student@198c75f4)

    (master02,Executor task launch worker for task 0,com.bw.spark.Student@198c75f4)

    广播变量中每个executor单独使用一份数据

    累加器(accumulator

    (共享变量)

    object accTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("acc")
        conf.setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),3)
       val acc =  sc.accumulator(0,"count")
        val acc1 = sc.longAccumulator("count")
        rdd.foreach(t=>{
          acc.add(1)
          acc1.add(1)
        })
        println(acc.value)
        println(acc1.value)
      }
    }

    计算iptest中的ip2Long所用的时长和binarySearch所用的时长?

    jdbcRDD

    object accTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("acc")
        conf.setMaster("local[*]")
        val sc = new SparkContext(conf)
        val url = "jdbc:mysql://master:3306/spark?characterEncoding=utf8"
        val username = "root"
        val password = "123456"

    //    sc: SparkContext,
    //    getConnection: () => Connection,
    //    sql: String,
    //    lowerBound: Long,
    //    upperBound: Long,
    //    numPartitions: Int,
    //    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _
        val rdd = new JdbcRDD[(String,Int)](
           sc,
          ()=>DriverManager.getConnection(url,username,password),
          "select * from visit_log where id>= ? and id <=?",
          1,
          5,
          2,
          rs=>{
            val name = rs.getString("name")
            val count = rs.getInt("count")
            (name,count)
          }
          )
        //select * from visit_log where id >1 and id<2
        //select * from visit_log where id>3 and id<5

        rdd.foreach(println)
      }
    }

    spark On yarn

    配置yarn资源管理平台

    capacity-scheduler.xml 文件:

    1.

     

    2.

    <property>

    <name>yarn.scheduler.capacity.resource-calculator</name>

    <!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->

    <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>

    </property>

    3.修改所有 yarn 节点的 yarn-site.xml,在该文件中添加如下配置

    4.

     

    <property>

     <name>yarn.nodemanager.pmem-check-enabled</name>

     <value>false</value>

    </property>

    <property>

     <name>yarn.nodemanager.vmem-check-enabled</name>

     <value>false</value>

    </property>

    以上是修改yarn中的配置、

    5.测试

     

    6.

    spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /root/Downloads/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 10

     spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /root/Downloads/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 10

    spark任务在yarn上面的运行提交命令

    7.

     

    在提交任务到yarn中的时候,一定要知道提交的yarn的集群地址,所以要在spark-env.sh中配置一个开关地址,设定yarn的配置文件位置

    8.

     

     

    yarn的监控页面中的8088端口可以查看任务,点击applicationMaster就可以查看任务的监控页面

    spark任务提交到yarn的集群中的时候存在两种模式,--cluster以集群形式运行 --client客户端形式运行

    cluster模式下任务是运行在集群中的,所有的结果都分布在不同的contianer中,所以不会展示出来,想要查看日志,必须将所有日志全部总结在一起,才能得到结果

    yarn logs -applicationId id

    客户端模式可以直接将结果展示到控制台中

    不管是在standalone集群中还是yarn的集群中,运行的时候都要产生executordriver

    sparkPi的结果数据一定在drvier中,能展示到控制台中整名clientdriver在一起

    在集群模式cluster模式中,结果没有办法展示,所以driver不在客户端,driverappMaster在一起

    如果提交任务后将client端关闭,cluster模式不影响  client模式会停止运行

    yarn中的spark-shell

    Error: Cluster deploy mode is not applicable to Spark shells.

    Run with --help for usage help or --verbose for debug output

    spark-shell只能运行在client模式中

    client模式下  

    CoarseGrainedExecutorBackEnd executor进程

    spark-submit;在哪提交的任务,哪里就存在这个进程

    executorLauncher:这个就是client模式的appMaster

    cluster模式

    CoarseGrainedExecutorBackEnd executor进程

    spark-submit;在哪提交的任务,哪里就存在这个进程

    appMaster == driver

    spark standalone集群的HA

     

    export

    SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER

    -Dspark.deploy.zookeeper.url=master:2181,master01:2181,master02:2181

    -Dspark.deploy.zookeeper.dir=/spark"

    在每个机器的spark-env.sh中将master的地址去掉

    增加上面的zk内容

    多启动一个master就可以了

  • 相关阅读:
    一个C++程序员学习C#语言
    C#入门教程笔记
    完全卸载mysql 停止服务、卸载相关程序、删除注册表
    C++结构简介
    babun,windows shell
    无限极设计以及随意移动节点(树结构)
    springboot 配置访问外部静态资源详解
    mysql8+keepalived 双主高可用搭建
    mysql 双主复制搭建
    mysql 主备搭建
  • 原文地址:https://www.cnblogs.com/JBLi/p/11528423.html
Copyright © 2011-2022 走看看