zoukankan      html  css  js  c++  java
  • spark08

    spark08

    这就是广播变量,每个executor中复用一份数据,在driver端将数据广播出去,在executor端使用

    val bd = sc.broadcast(iparr)
    val proRDD = accRDD.map(t=>{
      val province = binarySeach(t,bd.value)
      (province,1)
    })

    广播变量的内部机制

    比特洪流技术(快播,迅雷),共享资源

     

     

    广播变量实现类就是torrent实现类,就是之前用的种子,存放的数据交给每个blockManager组件进行管理,广播变量不能广播rdd,一般广播的都是小的带有规则信息的文件

    spark集群中的序列化

    object SeriaTest {
      /**
        * 1.是不是每条数据单独使用一个变量
        * 2.是不是每个task任务单独使用一份
        * 3.是不是每个executor单独使用一份+
        */


      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
        val rdd1 = rdd.map(t=>{
             //是不是每条数据使用一个对象
             val executor = InetAddress.getLocalHost.getHostName//ip地址就是executor
             val task = Thread.currentThread().getName//当前线程的名字
          val s = new Student
          (executor,task,s)
        })
        rdd1.saveAsTextFile("hdfs://master:9000/srresult001")
      }
    }


    object SeriaTest1 {
      /**
        * 1.是不是每条数据单独使用一个变量
        * 2.是不是每个task任务单独使用一份
        * 3.是不是每个executor单独使用一份
        */


      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
        val s = new Student
        val rdd1 = rdd.map(t=>{
          //是不是每条数据使用一个对象
          val executor = InetAddress.getLocalHost.getHostName//ip地址就是executor
          val task = Thread.currentThread().getName//当前线程的名字
          (executor,task,s)
        })
        rdd1.saveAsTextFile("hdfs://master:9000/srresult002")
      }
    }

    object SeriaTest2 {
      /**
        * 1.是不是每条数据单独使用一个变量
        * 2.是不是每个task任务单独使用一份
        * 3.是不是每个executor单独使用一份
        */


      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
        val s = new Student
       val bd =  sc.broadcast(s)
        val rdd1 = rdd.map(t=>{
          //是不是每条数据使用一个对象
          val executor = InetAddress.getLocalHost.getHostName//ip地址就是executor
          val task = Thread.currentThread().getName//当前线程的名字
          (executor,task,bd.value)
        })
        rdd1.saveAsTextFile("hdfs://master:9000/srresult003")
      }
    }
    class Student extends Serializable

     

    在算子中创建对象,一条数据创建一个对象

     

    在闭包引用的时候一个task任务用一份数据

    [root@master Downloads]# hdfs dfs -cat /srresult003/part-00003

    (master02,Executor task launch worker for task 3,com.bw.spark.Student@198c75f4)

    (master02,Executor task launch worker for task 3,com.bw.spark.Student@198c75f4)

    ^[[A[root@master Downloads]# hdfs dfs -cat /srresult003/part-00001

    (master01,Executor task launch worker for task 1,com.bw.spark.Student@6df927c2)

    (master01,Executor task launch worker for task 1,com.bw.spark.Student@6df927c2)

    ^[[A[root@master Downloads]# hdfs dfs -cat /srresult003/part-00002

    (master,Executor task launch worker for task 2,com.bw.spark.Student@3adc2bae)

    (master,Executor task launch worker for task 2,com.bw.spark.Student@3adc2bae)

    [root@master Downloads]# hdfs dfs -cat /srresult003/part-00000

    (master02,Executor task launch worker for task 0,com.bw.spark.Student@198c75f4)

    (master02,Executor task launch worker for task 0,com.bw.spark.Student@198c75f4)

    广播变量中每个executor单独使用一份数据

    累加器(accumulator

    (共享变量)

    object accTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("acc")
        conf.setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rdd =  sc.makeRDD(Array(1,2,3,4,5,6,7,8),3)
       val acc =  sc.accumulator(0,"count")
        val acc1 = sc.longAccumulator("count")
        rdd.foreach(t=>{
          acc.add(1)
          acc1.add(1)
        })
        println(acc.value)
        println(acc1.value)
      }
    }

    计算iptest中的ip2Long所用的时长和binarySearch所用的时长?

    jdbcRDD

    object accTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("acc")
        conf.setMaster("local[*]")
        val sc = new SparkContext(conf)
        val url = "jdbc:mysql://master:3306/spark?characterEncoding=utf8"
        val username = "root"
        val password = "123456"

    //    sc: SparkContext,
    //    getConnection: () => Connection,
    //    sql: String,
    //    lowerBound: Long,
    //    upperBound: Long,
    //    numPartitions: Int,
    //    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _
        val rdd = new JdbcRDD[(String,Int)](
           sc,
          ()=>DriverManager.getConnection(url,username,password),
          "select * from visit_log where id>= ? and id <=?",
          1,
          5,
          2,
          rs=>{
            val name = rs.getString("name")
            val count = rs.getInt("count")
            (name,count)
          }
          )
        //select * from visit_log where id >1 and id<2
        //select * from visit_log where id>3 and id<5

        rdd.foreach(println)
      }
    }

    spark On yarn

    配置yarn资源管理平台

    capacity-scheduler.xml 文件:

    1.

     

    2.

    <property>

    <name>yarn.scheduler.capacity.resource-calculator</name>

    <!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->

    <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>

    </property>

    3.修改所有 yarn 节点的 yarn-site.xml,在该文件中添加如下配置

    4.

     

    <property>

     <name>yarn.nodemanager.pmem-check-enabled</name>

     <value>false</value>

    </property>

    <property>

     <name>yarn.nodemanager.vmem-check-enabled</name>

     <value>false</value>

    </property>

    以上是修改yarn中的配置、

    5.测试

     

    6.

    spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /root/Downloads/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 10

     spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /root/Downloads/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 10

    spark任务在yarn上面的运行提交命令

    7.

     

    在提交任务到yarn中的时候,一定要知道提交的yarn的集群地址,所以要在spark-env.sh中配置一个开关地址,设定yarn的配置文件位置

    8.

     

     

    yarn的监控页面中的8088端口可以查看任务,点击applicationMaster就可以查看任务的监控页面

    spark任务提交到yarn的集群中的时候存在两种模式,--cluster以集群形式运行 --client客户端形式运行

    cluster模式下任务是运行在集群中的,所有的结果都分布在不同的contianer中,所以不会展示出来,想要查看日志,必须将所有日志全部总结在一起,才能得到结果

    yarn logs -applicationId id

    客户端模式可以直接将结果展示到控制台中

    不管是在standalone集群中还是yarn的集群中,运行的时候都要产生executordriver

    sparkPi的结果数据一定在drvier中,能展示到控制台中整名clientdriver在一起

    在集群模式cluster模式中,结果没有办法展示,所以driver不在客户端,driverappMaster在一起

    如果提交任务后将client端关闭,cluster模式不影响  client模式会停止运行

    yarn中的spark-shell

    Error: Cluster deploy mode is not applicable to Spark shells.

    Run with --help for usage help or --verbose for debug output

    spark-shell只能运行在client模式中

    client模式下  

    CoarseGrainedExecutorBackEnd executor进程

    spark-submit;在哪提交的任务,哪里就存在这个进程

    executorLauncher:这个就是client模式的appMaster

    cluster模式

    CoarseGrainedExecutorBackEnd executor进程

    spark-submit;在哪提交的任务,哪里就存在这个进程

    appMaster == driver

    spark standalone集群的HA

     

    export

    SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER

    -Dspark.deploy.zookeeper.url=master:2181,master01:2181,master02:2181

    -Dspark.deploy.zookeeper.dir=/spark"

    在每个机器的spark-env.sh中将master的地址去掉

    增加上面的zk内容

    多启动一个master就可以了

  • 相关阅读:
    ArrayList用法
    MessageBox
    将文本文件导入Sql数据库
    在桌面和菜单中添加快捷方式
    泡沫排序
    Making use of localized variables in javascript.
    Remove double empty lines in Visual Studio 2012
    Using Operations Manager Connectors
    Clear SharePoint Designer cache
    Programmatically set navigation settings in SharePoint 2013
  • 原文地址:https://www.cnblogs.com/JBLi/p/11528423.html
Copyright © 2011-2022 走看看