zoukankan      html  css  js  c++  java
  • 大数据学习——spark笔记

    变量的定义

    val a: Int = 1
    var b = 2
    

    方法和函数

    区别:函数可以作为参数传递给方法
    方法:
            def test(arg: Int): Int=>Int ={
                方法体
            }
            val fun = (test _: Int =>(Int=>Int))=>函数体
    

    逻辑执行语句

    val a = if(条件){
    
    执行逻辑
    返回值
    }else{
    执行逻辑
    }
    
    while(条件){
    执行逻辑
    }
    
    val arr = Array(1,2,3,4,5)  
    for(i <- 0 to arr.length ){
        arr(i)
    }
    
    for(i <- arr){
        i
    }
    

    集合操作

    Array ArrayBuffer List ListBuffer set Map tuple
    
    val arr = Array(1,2,3,4,5)
        arr(0)
        arr += 9
    val arrb = ArrayBuffer(1,2,3,4,5)
        arrb(0)
    val list = List(1,2,3,4)
    
    val tuple = (1,"string")
    tuple._1
    
    val map = Map("a"->1)
    val map = Map(("a",1))
    

    类(重要)

    类的主构造器:主构造器里面的变量会被执行,方法会被加载,调用的方法会被执行
    calss Test(){
        val int = 1
        def test(){
        }
    
        …………
        …………
        …………
        test
    
    }
    
    辅助构造器:重载
    
    extends with
    

    集合的高级操作(重要)

    map:将集合中的变量循环出来做操作
    flatMap:将集合中的参数压循环出来做操作
    val arr = Array("hello tom","hello lilei","hello hanmeimei")
    map:(hello tom),(hello lilei),(hello hanmeimei) 
    flatMap:(hello tom hello lilei hello hanmeimei)
    filter:过滤想要的元素
    groupBy:按照key进行分组,分组之后value合并到Array
    mapValues:针对kv类型的数据,只对value进行操作
    sortBy:针对某个元素进行排序
    val arr Array("hello tom","hello lilei")
    val result =  arr.flatMap(x => x.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortBy(_._2).recerse
    val result = arr.flatMap(_.split(" ")).map((_,1)),reduceByKey(_+_).sortBy(_._2,true)
    

    高级特性

    高阶函数:把函数作为参数传递给方法或者函数,函数在函数式编程中是第一位的。
        map(函数)
    
    隐式转换(PreDef):对类的增强,Int类没有to这个方法,然后再RichInt类中包含这个方法,我们只需要在某个地方将Int转换成RichInt,然后在用的地方import就ok了
    class RichFile(file: File){
    def read(file:File):String={
        Source.fromFile(file.getPath).mkString
    }
    
    }
    object RichFile{
    
    implict def file2RichFile(file:File)=RichFile(file)
    }
    
    object Test{
    def main(args:Array[String]){
        import RichFile.file2RichFile
        val file = new File("c://words.txt").read
    }
    
    }
    
    
    柯里化:将原来接收多个参数的方法或者函数,编程接收一个一个的方法或者函数,返回的是函数
    
        def test(a:Int)(b:Int)(c:Int){
            a+b+c
        }
        val fun = def(1) _
    

    actor 并发编程的接口(非常重要)

    actor:用消息传递的方式实现了并发编程,写起来像线程,玩起来像socket
    AKKA:actorSystem actOf
    

    spark(what、how、why、use、运维<源码的理解>)

    课程目标

    1、知道spark是干啥的
    2、会安装spark
    3、会写spark程序(scala、python、R、java)
    

    什么是spark?

    内存迭代式计算,利用DAG有向无环图
    特别非常快:在硬盘快mr10x,在内存,落你一条街100x
    易用性:代码写的少,可以用n中语言写,你mr就一种
    通用性:我集成了core、sql、streaming、MLlib、graphx,能交互
    无处不在:数据源多种(hdfs、hbase、mysql、文件),计算平台多种(standalone、YARN、mesos)
    

    how1(部署)

    1、下载安装包
    2、上传包
    3、解压
    4、重命名
    5、修改环境变量
    6、修改配置文件(重要,去官方文档看(别人的帖子,例如:www.wangsenfeng.com)、所有集群跑不起来都在这,通过log文件查看)
    7、下发(scp)
    8、修改其他机器的配置(可选)
    9、格式化(可选)
    10、启动集群(注意依赖关系)
    

    启动

    方式1:
        standalone-单master:
                            java_home、masterip、masterport、hadoopconf
    方式2:
        standalone-多master:
                            java_home、masterport、hadoopconf、zookeeper
    

    运行shell

    运行spark-shell的两种方式:
    1、直接运行spark-shell
        单机通过多线程跑任务,只在运行spark-shell的地方运行一个进程叫sparksubmit
    2、运行spark-shell --master spark://master1:7077
        将任务运行在集群中,在运行spark-shell的机器上运行sparksubmit进程,运行executor在worker上
    

    用api开发spark代码

    1、创建项目
    2、到pom.xml(在day01中)
    3、创建scala类
        import org.apache.spark.SparkContext //一切任务的起源,所有的计算的开头。(上下文)
        import org.apache.spark.SparkConf   //spark的配置信息,相当于mr当中的那个conf,他会覆盖掉默认的配置文件(如果你进行了配置),他的主要作用,这只app的名字,设置时运行本地模式还是集群模式
    4、写代码(参考官方文档)
        如果是在windows上运行,设置setMaster("local[n]")
        如果是线上运行,把setMaster("local[n]")去掉,或者setMaster("spark://master1:7077")(不建议)
        注意两个关键词:transformation,action
    

    提交任务到集群

    1、打jar包,去掉setMaster
    2、将jar上传到linux
    3、执行命令 
        spark-submit 
        --master spark://master1:7077 
        --executor-memory 512M 
        --total-executor-cores 2 
        --class org.apache.spark.WordCount 
        xxx.jar 
        in     
        out 
    

    用 python开发spark程序

    1、开发python的程序
    2、运行在集群,用spark-submit
    

    用R开发spark

    1、先安装R
        yum –y install gcc gcc-c++,
        yum –y install gcc-gfortran
        yum –y install readline-devel
        yum –y install libXt-devel
        yum –y install fonts-chinese tcl tcl-devel tclx tk tk-devel
        yum -y install epel-release
        vim /etc/yum.repos.d/epel.repo
        将
        #baseurl
        mirrorlist
        改成
        baseurl
        #mirrorlist
        yum -y install R 安装R语言
    
        2、然后按照官网的玩
    
        单机启动
         sparkR
    
        启动standalone
        sparkR --master spark://master1:7077
    
        启动yarn
        sparkR --master yarn-client 
    
        从hive读数据等
         sparkR --driver-class-path /home/hadoop/spark/lib/mysql-connector-java-5.1.35-bin.jar
    
        集群提交
        spark-submit examples/src/main/r/dataframe.R
    
        3、监控
        http://master1ha:4040/
    

    思考问题

    1、什么是RDD
    2、什么是stage
    3、什么是DAG
    

    随堂问题

    1、老师好,刚刚那个mr的container,是由resourceManager创建好,然后序列化后,再给NodeManager那些来反序列化的吗?
    答:是由resourcemanager创建好序列化发给applicationMaster,然后applicationMaster找nodemanager去启动资源
    2、老师,刚才那个执行结果分成两个文件,它的分区机制是将不同的单词进行hash 吗?
    答:是的,hash分区
    3、在集群上,R运行需要安装R,Python文件运行,需要安装Python么?
    答:需要安装,linux默认帮我们安装了python
    

    复习

    什么spark?

    内存迭代式计算,每个算子将计算结果保存在内存中,其他算子,读取这个结果,继续计算
    4个特性:快(10x、100x),易用性(代码优美、可以用4种语言开发依赖外部数据源(hdfs、本地文件、kafka、flume、mysql))、
    通用性(cores、sql、streaming、MLlib、graphx,交互使用)、随便那个平台都可以跑(standalone、yarn、mesos)
    

    搭建spark

    一主多从:
            1、下载安装包(依赖的hadoop的版本,source是下载源码的)
            2、上传到集群
            3、解压
            4、重命名(版本更新不需要修改环境变量)
            5、修改环境变量(root)
            6、修改配置文件(spark-env.sh:JAVA_HOME,master_ip,master_port,hadoop_conf_dir、java_opts(-D);slaves(从的域名))
            7、下发(scp)
            8、启动集群(start-all.sh;start-master.sh;start-slave.sh master的地址)
            9、spark的协议:spark://master:7077
            10、浏览器端口:master:8080
            11、R语言的浏览器任务查看:masterR:4040
    
    多主多从:多加了zookeeper调度(选举机制)
    

    命令行

    1、spark-shell:在当台机器上启动一个进程sparksubmit,通过多线程的方式模拟集群
    2、spark-shell --master spark://master1:7077:启动的事集群版shell,任务会提交到集群运行,
        在当前的机器启动的集成sparksubmit,在丛集器启动的集成叫xxxxexecutorbackend
        默认没有加从机器的cores和memory参数,会在每台丛集器启动一个executor进程,
        如果加了--total-executor-cores n会启动n个executor进程
    

    命令行版的wordcount

    注意:在sparkshell中帮我们默认加载了SparkContext,并命名为sc;也帮我们创建了SparkConf,并且设置了appname(“sparkshell”),并且设置了setmaster(“local/spark://...”)
    sc.textFile("file:///... ; hdfs://...").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => x+y).sortBy(_._2,false).saveAsTextFile("hdfs://...")
    

    spark的api操作

    1、scala
    2、python
        #!/usr/bin/python
        from pyspark import SparkConf , SparkContext
        sc.textFile("hdfs://...").flatMap(lambda x: x.split(" ")).map(lambda y:(y,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile("hdfs://...")
    3、R
        ......
    4、java
        ......
    

    RDD

    目标

    1、什么是RDD?
    2、RDD的创建方式和依赖关系
    3、DAG有向无环图的意义
    4、掌握划分stage的过程
    5、掌握RDD的所有操作!!!!
    

    什么是RDD?

    RDD(Resilient Distributed Datasets )定义为弹性的分布式数据集,包含了只读的、分区的、分布式计算的概念;RDD是个类
    1、一个数据分区的列表(hdfs的所有数据块的位置信息,保存在我RDD类成员变量Array中)
    2、保存了在数据块上的计算方法,这个计算方法会应用到每一个数据块上
    3、一个对于其他RDD的依赖,是一个集合,spark就是通过这种依赖关系,像流水线一样处理我们的数据,
        当某个分区的数据计算失败,只需要根据流水线的信息,重新计算这一个分区的数据即可,不需要计算全部数据
    4、分区方式(partitioner),决定RDD数据来源的分区和数据计算后的分区:hashpartitioner;rangepartitioner
    5、位置相关性(hdfs)
    

    如何创建RDD

    1、通过序列化集合的方式创建RDD(parallelize,makeRDD)
    2、通过读取外部的数据源(testFile)
    3、通过其他的rdd做transformation操作转换成新的RDD
    
    RDD的两钟算子:
    1、transformation:
        通过算法对RDD进程转换,延迟加载的一个处理数据及的方法:
        map flatMap reduceByKey 
    2、Action:
        触发整个job进行计算的算子
        collect top first saveAsTextFile
    

    广播(broadcast)变量

    :其广泛用于广播Map Side Join中的小表,以及广播大变量等场景。这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。
    Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。相比Hadoop的distributed cache,广播的内容可以跨作业共享。
    Broadcast的底层实现采用了BT机制
    

    ipLocation

    1、广播变量
    2、ip转long(分金定穴循八卦,toolong插棍左八圈)
    3、二分法查找:(上下循环寻上下,左移右移寻中间)
    4、分区存数据库(foreachPartition)
    

    作业:

    1、把所有的算子运行一遍
    2、把iplocation的思想理解,代码运行
    3、有富余时间的情况下,敲一个iplocation就行了
    

    问题

    1、每个stege是作为一个任务整体,序列化后发送给一台机器反序列话执行吗?里面包含的多个RDD是串联起来工作的吗?
    答:是的
    2、MapReduce中MRappmaster,启动mapTask的时候,那个map类实例是不是已经序列化并且被包含在ResourceManager的任务队列中的任务对象中?
    答:是的
    3、老师,只有对于于key-value的RDD,才会有Partitioner,怎么理解??
    答:kv型数据的RDD按照Key进行分组操作,非kv的数据不需要分组操作,因为没有响应的算子提供
    4、讲解RDD的时候,可以把跟综进源码的路径加在笔记里吗?希望可以在阅读源码的基础上理解RDD
    答:通过crtl+shift+R打开源码RDD.scala就能查看了
    5、还有那个分片的工作,是任务提交之前就做好了吧,MapReduce的job.split文件好像就是在任务提交之前就在客户端通过fileinputformat已经分好了,然后再发送到HDFS上
    答:对的,我们的分片也是做好了之后发送任务
    6、getPartitions方法在整个运行过程中总共会调用几次? 数据都是分开运行的吗? 如果是分开运行的,那只需要在第一个MapRDD调用一次。请问这样理解对吗?
    答:getPartitions是在任务开始之前调用一次,拿出分区的地址进行分发任务
    

    复习

    1、什么是RDD
        一个分区的列表(FileSplit),决定读取的文件在哪
        一个应用在每个分区上的算子
        一个对其他RDD的依赖集合
        可选:一个决定数据存储时的分区方式
        可选:如果在yarn上运行,决定数据本地运行的方式,移动数据不如移动计算
    2、如何创建RDD
        1、通过序列化集合的方式(makeRDD、parallelize)
        2、通过读取文件的方式
        3、通过其他的RDD进行transformation转换而来
    3、RDD的算子
        transformation:(懒加载)
        map、flatMap、filter、mapPartition、groupByKey、reduceByKey、union、intersaction、distinct、aggregateByKey
        Action:(触发任务的进行)
        top、take、first、count、collect、foreach、savaAsTextFile、reduce
    
    4、iplocaltion:(ip的热力图)
        1、广播变量:共享的内存,只读的,只能追加的
        2、ip转long:分金定穴循八卦、toolong插棍左八圈
        3、二分法查找:上下循环寻上下,左移右移寻中间
        4、foreachPartition:对每个分区的数据进行操作,可以在分区操作的时候创建外部链接(jedis、mysql、hbase)
    

    目标:

    1、掌握RDD的stage划分
    2、掌握DAG概念
    3、学会使用如何创建RDD的缓存
    4、学会使用如何创建RDD的checkpoint
    

    RDD的依赖关系

    宽依赖:依赖的RDD产生的数据不只是给我用的。父RDD不只包含一个子RDD的数据(多对对),非独生子女
    窄依赖:依赖的RDD产生的数据只给我自己。父RDD只包含一个子RDD的数据(一对一、一对多)。独生子女
    Lineage(血统):RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。
            RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
    

    找依赖关系划分stage的目的

    1、如何通过stage的划分设置缓存
        1、在窄依赖想设置缓存的时候,用cache
        2、在宽依赖想设置缓存的话,用checkpoint
    

    如何设置cache和checkpoint

    cache:
            someRDD.cache():将缓存放到内存中
            someRDD.persist(StorageLevel.MEMORY_AND_DISK):根据自己的需要设置缓存的位置(内存和硬盘)
    
    checkPoint:可以吧RDD计算后的数据存储在本地的磁盘,也可以是hdfs
            sc.setCheckpointDir("hdfs://master1:9000/ceshi/checkpoint")//设置checkpoint的路径
            someRDD.checkpoin()
    

    什么时候设置缓存,什么时候设置checkpoint

    遇到宽依赖设置checkpoint,窄依赖想缓存的话设置cache
    

    cache 和 checkpoint的区别?

    cache只是缓存数据,不改变RDD的依赖关系
    checkpoint是生成了一个新的RDD,后面的RDD依赖的关系已经改变。
    
    checkpoint--》cache--》重算
    

    四个案例

    1、pv:点击率
    2、uv:在线用户数
    3、topk:微博热门词汇
    4、moblelocation:统计家庭位置和工作位置
    

    什么是spark-sql

    相当于hive
    

    书写代码的两种模式

    datafream:spark-sql自己的语法
    
    sql:spark-sql集成sql的语法
    1、通过sc加载任意类型数据
    2、创建case class Person(id:Int , name:String , age:Int)(表结构)
    3、将数据添加到表结构中map
    4、注册表
    5、通过sqlContext.sql()
    

    spark-sql的api

    两种模式(表的schema加载的两种模式)
    1、通过case class的方式加载表结构
    2、通过StructType去自己定义表结构
    

    作业

    1、moblelocation回去运行一遍,如果有富余时间敲几遍
    2、把sparl-sql的命令行和代码敲一遍
    

    复习

    RDD的依赖关系

    1、宽依赖(多对多)
    2、窄依赖(一对一 和 多对一)
    

    通过宽依赖和窄依赖划分stage

    1、遇到宽依赖,宽依赖到上一个宽依赖之间的所有窄依赖是一个stage
    2、stage之间有包含关系
    

    划分stage的目的

    1、用来划分task
    2、用来指导什么地方需要设置什么样的缓存(cache、checkpoint)
    

    如何设置缓存

    1、someRDD.cache()
    2、someRDD.persist(StorageLeavel.MEMORY_AND_DISK_2)
    3、sc.setCheckPointDir("hdfs://...")
        someRDD.checkpoint()
    

    DAG

    一个任务组成的流水线就是DAG(DAGscheduler)
    DAG可以划分成n个stage
    stage对应n个RDD
    把stage封装成Task(stage),把task分发下去(TaskScheduler)
    

    PV UV topK

    pv:点击率
    sc.textFile("hdfs://..").map(("pv",1)).reduceByKey(_+_).saveAsTextFile("hdfs://...")
    
    uv:在线用户量:通过ip去重,按照(“uv”,1)
    
    topK:微博热门词汇
        top谁--》wordcount--》排序--》take(正序)top(倒叙)
    

    环比的pv uv

    网站分析的文档
    

    mobileLocation(家庭位置、工作位置)

    1、先将数据进行清洗(家庭、工作)
    2、针对家庭和工作进行重复数据收集
    3、分别对家庭和工作做计算(尾-时间,时间-头)
    4、数据去重
    5、转转转(手机号,(基站id,时间total))-》join(基站id)找坐标
    

    spark-sql

    ==hive
    

    操作的两种方式

    1、datafream
        1、创建SqlContext(sc)
        2、通过sc读取数据
        3、通过case class或者是structType创建表结构
        4、将数据加载到表结构中(Person或者Row)
        5、隐式转换sqlContext.implict._
        6、将RDD转换为DF//show
        7、注册成表
        8、sqlContext.sql("").show // write.   
    

    目标

    1、利用spark-sql从mysql中读写数据
    2、spark-sql能不能集成hive使用
    3、练习
    1、spar-streaming(对比storm)
    2、flume+spark-streaming
    3、kafka+spark-streaming
    

    如何从mysql中读数据

    1、必须有mysql的driver(上传mysql的jar包)
    2、加载mysql包(spark-shell --master spark://master1:7077 --jars mysql.jar --driver-class-path mysql.jar)
    3、读取数据的时候,设置(sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.56.204/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"dept","user"->"root","password"->"root")).load())
    4、mysql中的表结构会读吗?(有帮我们加载表结构)
    

    往mysql中写数据

    1、需要mysql的jar包
    2、sc读数据
    3、datefream.write.mode("append"/"overwrite").jdbc("url","table",properties(user,password))
    

    hive on spark-SQL

    1、安装hive,修改元数据库,加上hive-site.xml(mysql连接)
    2、将hive-site.xml文件拷贝到集群的conf下
    3、强mysql.jar拷贝到spark的lib下
    4、执行:sqlContext.sql("select * from table1")
                                                .show()  
                                                .write.mode("append").jdbc()    
                                                .foreachPartition(it => {
                                                    1、初始化连接
                                                    it.map(x =>{
                                                    2、写数据到存储层
                                                    })
                                                    3、关连接
                                                })
    

    什么是spark-streaming?

    spark流失处理的框架,能够很容易的构建容错、高可用的计算模型
    特点:1、易用;2、容错;3、集成;
    

    spark-streaming和spark的批处理有什么关系?

    spark-streaming是小批量的RDD处理方式
    

    spark-streaming的应用

    从tcp的client中读取数据,进行汇总操作
    还以从flume中读取数据
        poll:ip地址以flume为主
        push:IP地址以streaming为主
    
    还可以从kafka中读取数据
    

     

  • 相关阅读:
    Oracle Core 学习笔记二 Transactions 和 Consistency 说明
    Oracle AUTO_SPACE_ADVISOR_JOB 说明
    Windows 下 ftp 上传文件 脚本
    Oracle 11g 中 Direct path reads 特性 说明
    Linux 使用 wget 下载 Oracle 软件说明
    Oracle 10g read by other session 等待 说明
    Oracle 11g RAC INS06006 Passwordless SSH connectivity not set up between the following node(s) 解决方法
    SecureCRT 工具 上传下载数据 与 ASCII、Xmodem、Ymodem 、Zmodem 说明
    Oracle RAC root.sh 报错 Timed out waiting for the CRS stack to start 解决方法
    Oracle RESETLOGS 和 NORESETLOGS 区别说明
  • 原文地址:https://www.cnblogs.com/feifeicui/p/11002754.html
Copyright © 2011-2022 走看看