zoukankan      html  css  js  c++  java
  • 大数据学习——spark笔记

    变量的定义

    val a: Int = 1
    var b = 2
    

    方法和函数

    区别:函数可以作为参数传递给方法
    方法:
            def test(arg: Int): Int=>Int ={
                方法体
            }
            val fun = (test _: Int =>(Int=>Int))=>函数体
    

    逻辑执行语句

    val a = if(条件){
    
    执行逻辑
    返回值
    }else{
    执行逻辑
    }
    
    while(条件){
    执行逻辑
    }
    
    val arr = Array(1,2,3,4,5)  
    for(i <- 0 to arr.length ){
        arr(i)
    }
    
    for(i <- arr){
        i
    }
    

    集合操作

    Array ArrayBuffer List ListBuffer set Map tuple
    
    val arr = Array(1,2,3,4,5)
        arr(0)
        arr += 9
    val arrb = ArrayBuffer(1,2,3,4,5)
        arrb(0)
    val list = List(1,2,3,4)
    
    val tuple = (1,"string")
    tuple._1
    
    val map = Map("a"->1)
    val map = Map(("a",1))
    

    类(重要)

    类的主构造器:主构造器里面的变量会被执行,方法会被加载,调用的方法会被执行
    calss Test(){
        val int = 1
        def test(){
        }
    
        …………
        …………
        …………
        test
    
    }
    
    辅助构造器:重载
    
    extends with
    

    集合的高级操作(重要)

    map:将集合中的变量循环出来做操作
    flatMap:将集合中的参数压循环出来做操作
    val arr = Array("hello tom","hello lilei","hello hanmeimei")
    map:(hello tom),(hello lilei),(hello hanmeimei) 
    flatMap:(hello tom hello lilei hello hanmeimei)
    filter:过滤想要的元素
    groupBy:按照key进行分组,分组之后value合并到Array
    mapValues:针对kv类型的数据,只对value进行操作
    sortBy:针对某个元素进行排序
    val arr Array("hello tom","hello lilei")
    val result =  arr.flatMap(x => x.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortBy(_._2).recerse
    val result = arr.flatMap(_.split(" ")).map((_,1)),reduceByKey(_+_).sortBy(_._2,true)
    

    高级特性

    高阶函数:把函数作为参数传递给方法或者函数,函数在函数式编程中是第一位的。
        map(函数)
    
    隐式转换(PreDef):对类的增强,Int类没有to这个方法,然后再RichInt类中包含这个方法,我们只需要在某个地方将Int转换成RichInt,然后在用的地方import就ok了
    class RichFile(file: File){
    def read(file:File):String={
        Source.fromFile(file.getPath).mkString
    }
    
    }
    object RichFile{
    
    implict def file2RichFile(file:File)=RichFile(file)
    }
    
    object Test{
    def main(args:Array[String]){
        import RichFile.file2RichFile
        val file = new File("c://words.txt").read
    }
    
    }
    
    
    柯里化:将原来接收多个参数的方法或者函数,编程接收一个一个的方法或者函数,返回的是函数
    
        def test(a:Int)(b:Int)(c:Int){
            a+b+c
        }
        val fun = def(1) _
    

    actor 并发编程的接口(非常重要)

    actor:用消息传递的方式实现了并发编程,写起来像线程,玩起来像socket
    AKKA:actorSystem actOf
    

    spark(what、how、why、use、运维<源码的理解>)

    课程目标

    1、知道spark是干啥的
    2、会安装spark
    3、会写spark程序(scala、python、R、java)
    

    什么是spark?

    内存迭代式计算,利用DAG有向无环图
    特别非常快:在硬盘快mr10x,在内存,落你一条街100x
    易用性:代码写的少,可以用n中语言写,你mr就一种
    通用性:我集成了core、sql、streaming、MLlib、graphx,能交互
    无处不在:数据源多种(hdfs、hbase、mysql、文件),计算平台多种(standalone、YARN、mesos)
    

    how1(部署)

    1、下载安装包
    2、上传包
    3、解压
    4、重命名
    5、修改环境变量
    6、修改配置文件(重要,去官方文档看(别人的帖子,例如:www.wangsenfeng.com)、所有集群跑不起来都在这,通过log文件查看)
    7、下发(scp)
    8、修改其他机器的配置(可选)
    9、格式化(可选)
    10、启动集群(注意依赖关系)
    

    启动

    方式1:
        standalone-单master:
                            java_home、masterip、masterport、hadoopconf
    方式2:
        standalone-多master:
                            java_home、masterport、hadoopconf、zookeeper
    

    运行shell

    运行spark-shell的两种方式:
    1、直接运行spark-shell
        单机通过多线程跑任务,只在运行spark-shell的地方运行一个进程叫sparksubmit
    2、运行spark-shell --master spark://master1:7077
        将任务运行在集群中,在运行spark-shell的机器上运行sparksubmit进程,运行executor在worker上
    

    用api开发spark代码

    1、创建项目
    2、到pom.xml(在day01中)
    3、创建scala类
        import org.apache.spark.SparkContext //一切任务的起源,所有的计算的开头。(上下文)
        import org.apache.spark.SparkConf   //spark的配置信息,相当于mr当中的那个conf,他会覆盖掉默认的配置文件(如果你进行了配置),他的主要作用,这只app的名字,设置时运行本地模式还是集群模式
    4、写代码(参考官方文档)
        如果是在windows上运行,设置setMaster("local[n]")
        如果是线上运行,把setMaster("local[n]")去掉,或者setMaster("spark://master1:7077")(不建议)
        注意两个关键词:transformation,action
    

    提交任务到集群

    1、打jar包,去掉setMaster
    2、将jar上传到linux
    3、执行命令 
        spark-submit 
        --master spark://master1:7077 
        --executor-memory 512M 
        --total-executor-cores 2 
        --class org.apache.spark.WordCount 
        xxx.jar 
        in     
        out 
    

    用 python开发spark程序

    1、开发python的程序
    2、运行在集群,用spark-submit
    

    用R开发spark

    1、先安装R
        yum –y install gcc gcc-c++,
        yum –y install gcc-gfortran
        yum –y install readline-devel
        yum –y install libXt-devel
        yum –y install fonts-chinese tcl tcl-devel tclx tk tk-devel
        yum -y install epel-release
        vim /etc/yum.repos.d/epel.repo
        将
        #baseurl
        mirrorlist
        改成
        baseurl
        #mirrorlist
        yum -y install R 安装R语言
    
        2、然后按照官网的玩
    
        单机启动
         sparkR
    
        启动standalone
        sparkR --master spark://master1:7077
    
        启动yarn
        sparkR --master yarn-client 
    
        从hive读数据等
         sparkR --driver-class-path /home/hadoop/spark/lib/mysql-connector-java-5.1.35-bin.jar
    
        集群提交
        spark-submit examples/src/main/r/dataframe.R
    
        3、监控
        http://master1ha:4040/
    

    思考问题

    1、什么是RDD
    2、什么是stage
    3、什么是DAG
    

    随堂问题

    1、老师好,刚刚那个mr的container,是由resourceManager创建好,然后序列化后,再给NodeManager那些来反序列化的吗?
    答:是由resourcemanager创建好序列化发给applicationMaster,然后applicationMaster找nodemanager去启动资源
    2、老师,刚才那个执行结果分成两个文件,它的分区机制是将不同的单词进行hash 吗?
    答:是的,hash分区
    3、在集群上,R运行需要安装R,Python文件运行,需要安装Python么?
    答:需要安装,linux默认帮我们安装了python
    

    复习

    什么spark?

    内存迭代式计算,每个算子将计算结果保存在内存中,其他算子,读取这个结果,继续计算
    4个特性:快(10x、100x),易用性(代码优美、可以用4种语言开发依赖外部数据源(hdfs、本地文件、kafka、flume、mysql))、
    通用性(cores、sql、streaming、MLlib、graphx,交互使用)、随便那个平台都可以跑(standalone、yarn、mesos)
    

    搭建spark

    一主多从:
            1、下载安装包(依赖的hadoop的版本,source是下载源码的)
            2、上传到集群
            3、解压
            4、重命名(版本更新不需要修改环境变量)
            5、修改环境变量(root)
            6、修改配置文件(spark-env.sh:JAVA_HOME,master_ip,master_port,hadoop_conf_dir、java_opts(-D);slaves(从的域名))
            7、下发(scp)
            8、启动集群(start-all.sh;start-master.sh;start-slave.sh master的地址)
            9、spark的协议:spark://master:7077
            10、浏览器端口:master:8080
            11、R语言的浏览器任务查看:masterR:4040
    
    多主多从:多加了zookeeper调度(选举机制)
    

    命令行

    1、spark-shell:在当台机器上启动一个进程sparksubmit,通过多线程的方式模拟集群
    2、spark-shell --master spark://master1:7077:启动的事集群版shell,任务会提交到集群运行,
        在当前的机器启动的集成sparksubmit,在丛集器启动的集成叫xxxxexecutorbackend
        默认没有加从机器的cores和memory参数,会在每台丛集器启动一个executor进程,
        如果加了--total-executor-cores n会启动n个executor进程
    

    命令行版的wordcount

    注意:在sparkshell中帮我们默认加载了SparkContext,并命名为sc;也帮我们创建了SparkConf,并且设置了appname(“sparkshell”),并且设置了setmaster(“local/spark://...”)
    sc.textFile("file:///... ; hdfs://...").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => x+y).sortBy(_._2,false).saveAsTextFile("hdfs://...")
    

    spark的api操作

    1、scala
    2、python
        #!/usr/bin/python
        from pyspark import SparkConf , SparkContext
        sc.textFile("hdfs://...").flatMap(lambda x: x.split(" ")).map(lambda y:(y,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile("hdfs://...")
    3、R
        ......
    4、java
        ......
    

    RDD

    目标

    1、什么是RDD?
    2、RDD的创建方式和依赖关系
    3、DAG有向无环图的意义
    4、掌握划分stage的过程
    5、掌握RDD的所有操作!!!!
    

    什么是RDD?

    RDD(Resilient Distributed Datasets )定义为弹性的分布式数据集,包含了只读的、分区的、分布式计算的概念;RDD是个类
    1、一个数据分区的列表(hdfs的所有数据块的位置信息,保存在我RDD类成员变量Array中)
    2、保存了在数据块上的计算方法,这个计算方法会应用到每一个数据块上
    3、一个对于其他RDD的依赖,是一个集合,spark就是通过这种依赖关系,像流水线一样处理我们的数据,
        当某个分区的数据计算失败,只需要根据流水线的信息,重新计算这一个分区的数据即可,不需要计算全部数据
    4、分区方式(partitioner),决定RDD数据来源的分区和数据计算后的分区:hashpartitioner;rangepartitioner
    5、位置相关性(hdfs)
    

    如何创建RDD

    1、通过序列化集合的方式创建RDD(parallelize,makeRDD)
    2、通过读取外部的数据源(testFile)
    3、通过其他的rdd做transformation操作转换成新的RDD
    
    RDD的两钟算子:
    1、transformation:
        通过算法对RDD进程转换,延迟加载的一个处理数据及的方法:
        map flatMap reduceByKey 
    2、Action:
        触发整个job进行计算的算子
        collect top first saveAsTextFile
    

    广播(broadcast)变量

    :其广泛用于广播Map Side Join中的小表,以及广播大变量等场景。这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。
    Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。相比Hadoop的distributed cache,广播的内容可以跨作业共享。
    Broadcast的底层实现采用了BT机制
    

    ipLocation

    1、广播变量
    2、ip转long(分金定穴循八卦,toolong插棍左八圈)
    3、二分法查找:(上下循环寻上下,左移右移寻中间)
    4、分区存数据库(foreachPartition)
    

    作业:

    1、把所有的算子运行一遍
    2、把iplocation的思想理解,代码运行
    3、有富余时间的情况下,敲一个iplocation就行了
    

    问题

    1、每个stege是作为一个任务整体,序列化后发送给一台机器反序列话执行吗?里面包含的多个RDD是串联起来工作的吗?
    答:是的
    2、MapReduce中MRappmaster,启动mapTask的时候,那个map类实例是不是已经序列化并且被包含在ResourceManager的任务队列中的任务对象中?
    答:是的
    3、老师,只有对于于key-value的RDD,才会有Partitioner,怎么理解??
    答:kv型数据的RDD按照Key进行分组操作,非kv的数据不需要分组操作,因为没有响应的算子提供
    4、讲解RDD的时候,可以把跟综进源码的路径加在笔记里吗?希望可以在阅读源码的基础上理解RDD
    答:通过crtl+shift+R打开源码RDD.scala就能查看了
    5、还有那个分片的工作,是任务提交之前就做好了吧,MapReduce的job.split文件好像就是在任务提交之前就在客户端通过fileinputformat已经分好了,然后再发送到HDFS上
    答:对的,我们的分片也是做好了之后发送任务
    6、getPartitions方法在整个运行过程中总共会调用几次? 数据都是分开运行的吗? 如果是分开运行的,那只需要在第一个MapRDD调用一次。请问这样理解对吗?
    答:getPartitions是在任务开始之前调用一次,拿出分区的地址进行分发任务
    

    复习

    1、什么是RDD
        一个分区的列表(FileSplit),决定读取的文件在哪
        一个应用在每个分区上的算子
        一个对其他RDD的依赖集合
        可选:一个决定数据存储时的分区方式
        可选:如果在yarn上运行,决定数据本地运行的方式,移动数据不如移动计算
    2、如何创建RDD
        1、通过序列化集合的方式(makeRDD、parallelize)
        2、通过读取文件的方式
        3、通过其他的RDD进行transformation转换而来
    3、RDD的算子
        transformation:(懒加载)
        map、flatMap、filter、mapPartition、groupByKey、reduceByKey、union、intersaction、distinct、aggregateByKey
        Action:(触发任务的进行)
        top、take、first、count、collect、foreach、savaAsTextFile、reduce
    
    4、iplocaltion:(ip的热力图)
        1、广播变量:共享的内存,只读的,只能追加的
        2、ip转long:分金定穴循八卦、toolong插棍左八圈
        3、二分法查找:上下循环寻上下,左移右移寻中间
        4、foreachPartition:对每个分区的数据进行操作,可以在分区操作的时候创建外部链接(jedis、mysql、hbase)
    

    目标:

    1、掌握RDD的stage划分
    2、掌握DAG概念
    3、学会使用如何创建RDD的缓存
    4、学会使用如何创建RDD的checkpoint
    

    RDD的依赖关系

    宽依赖:依赖的RDD产生的数据不只是给我用的。父RDD不只包含一个子RDD的数据(多对对),非独生子女
    窄依赖:依赖的RDD产生的数据只给我自己。父RDD只包含一个子RDD的数据(一对一、一对多)。独生子女
    Lineage(血统):RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。
            RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
    

    找依赖关系划分stage的目的

    1、如何通过stage的划分设置缓存
        1、在窄依赖想设置缓存的时候,用cache
        2、在宽依赖想设置缓存的话,用checkpoint
    

    如何设置cache和checkpoint

    cache:
            someRDD.cache():将缓存放到内存中
            someRDD.persist(StorageLevel.MEMORY_AND_DISK):根据自己的需要设置缓存的位置(内存和硬盘)
    
    checkPoint:可以吧RDD计算后的数据存储在本地的磁盘,也可以是hdfs
            sc.setCheckpointDir("hdfs://master1:9000/ceshi/checkpoint")//设置checkpoint的路径
            someRDD.checkpoin()
    

    什么时候设置缓存,什么时候设置checkpoint

    遇到宽依赖设置checkpoint,窄依赖想缓存的话设置cache
    

    cache 和 checkpoint的区别?

    cache只是缓存数据,不改变RDD的依赖关系
    checkpoint是生成了一个新的RDD,后面的RDD依赖的关系已经改变。
    
    checkpoint--》cache--》重算
    

    四个案例

    1、pv:点击率
    2、uv:在线用户数
    3、topk:微博热门词汇
    4、moblelocation:统计家庭位置和工作位置
    

    什么是spark-sql

    相当于hive
    

    书写代码的两种模式

    datafream:spark-sql自己的语法
    
    sql:spark-sql集成sql的语法
    1、通过sc加载任意类型数据
    2、创建case class Person(id:Int , name:String , age:Int)(表结构)
    3、将数据添加到表结构中map
    4、注册表
    5、通过sqlContext.sql()
    

    spark-sql的api

    两种模式(表的schema加载的两种模式)
    1、通过case class的方式加载表结构
    2、通过StructType去自己定义表结构
    

    作业

    1、moblelocation回去运行一遍,如果有富余时间敲几遍
    2、把sparl-sql的命令行和代码敲一遍
    

    复习

    RDD的依赖关系

    1、宽依赖(多对多)
    2、窄依赖(一对一 和 多对一)
    

    通过宽依赖和窄依赖划分stage

    1、遇到宽依赖,宽依赖到上一个宽依赖之间的所有窄依赖是一个stage
    2、stage之间有包含关系
    

    划分stage的目的

    1、用来划分task
    2、用来指导什么地方需要设置什么样的缓存(cache、checkpoint)
    

    如何设置缓存

    1、someRDD.cache()
    2、someRDD.persist(StorageLeavel.MEMORY_AND_DISK_2)
    3、sc.setCheckPointDir("hdfs://...")
        someRDD.checkpoint()
    

    DAG

    一个任务组成的流水线就是DAG(DAGscheduler)
    DAG可以划分成n个stage
    stage对应n个RDD
    把stage封装成Task(stage),把task分发下去(TaskScheduler)
    

    PV UV topK

    pv:点击率
    sc.textFile("hdfs://..").map(("pv",1)).reduceByKey(_+_).saveAsTextFile("hdfs://...")
    
    uv:在线用户量:通过ip去重,按照(“uv”,1)
    
    topK:微博热门词汇
        top谁--》wordcount--》排序--》take(正序)top(倒叙)
    

    环比的pv uv

    网站分析的文档
    

    mobileLocation(家庭位置、工作位置)

    1、先将数据进行清洗(家庭、工作)
    2、针对家庭和工作进行重复数据收集
    3、分别对家庭和工作做计算(尾-时间,时间-头)
    4、数据去重
    5、转转转(手机号,(基站id,时间total))-》join(基站id)找坐标
    

    spark-sql

    ==hive
    

    操作的两种方式

    1、datafream
        1、创建SqlContext(sc)
        2、通过sc读取数据
        3、通过case class或者是structType创建表结构
        4、将数据加载到表结构中(Person或者Row)
        5、隐式转换sqlContext.implict._
        6、将RDD转换为DF//show
        7、注册成表
        8、sqlContext.sql("").show // write.   
    

    目标

    1、利用spark-sql从mysql中读写数据
    2、spark-sql能不能集成hive使用
    3、练习
    1、spar-streaming(对比storm)
    2、flume+spark-streaming
    3、kafka+spark-streaming
    

    如何从mysql中读数据

    1、必须有mysql的driver(上传mysql的jar包)
    2、加载mysql包(spark-shell --master spark://master1:7077 --jars mysql.jar --driver-class-path mysql.jar)
    3、读取数据的时候,设置(sqlContext.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.56.204/bigdata","driver"->"com.mysql.jdbc.Driver","dbtable"->"dept","user"->"root","password"->"root")).load())
    4、mysql中的表结构会读吗?(有帮我们加载表结构)
    

    往mysql中写数据

    1、需要mysql的jar包
    2、sc读数据
    3、datefream.write.mode("append"/"overwrite").jdbc("url","table",properties(user,password))
    

    hive on spark-SQL

    1、安装hive,修改元数据库,加上hive-site.xml(mysql连接)
    2、将hive-site.xml文件拷贝到集群的conf下
    3、强mysql.jar拷贝到spark的lib下
    4、执行:sqlContext.sql("select * from table1")
                                                .show()  
                                                .write.mode("append").jdbc()    
                                                .foreachPartition(it => {
                                                    1、初始化连接
                                                    it.map(x =>{
                                                    2、写数据到存储层
                                                    })
                                                    3、关连接
                                                })
    

    什么是spark-streaming?

    spark流失处理的框架,能够很容易的构建容错、高可用的计算模型
    特点:1、易用;2、容错;3、集成;
    

    spark-streaming和spark的批处理有什么关系?

    spark-streaming是小批量的RDD处理方式
    

    spark-streaming的应用

    从tcp的client中读取数据,进行汇总操作
    还以从flume中读取数据
        poll:ip地址以flume为主
        push:IP地址以streaming为主
    
    还可以从kafka中读取数据
    

     

  • 相关阅读:
    OOP、DI、IOC的情爱恩仇录
    NuGet:添加EntityFramework
    DataGrid之DataGridComboBoxColumn,DataGridCheckBoxColumn,DataGridHyperlinkColumn,DataGridTextColumn
    JohnSon:动态创建模块选项卡
    maf实例
    MVVM理解之逐步重构成为MVVM模式,比MVC的独到之处
    我学Unity系列1:Unity和Mef的比较
    微软一站式代码资料
    匿名方法,泛型委托,Lambda表达式
    关于JS表单验证(转)
  • 原文地址:https://www.cnblogs.com/feifeicui/p/11002754.html
Copyright © 2011-2022 走看看