zoukankan      html  css  js  c++  java
  • 大数据技术-spark+hive+hbase研究

    大数据 spark 研究

    0基础入门)

    一 背景

    基础

    Scala 语言基础:Scala详细总结(精辟版++)

    spark 介绍    :  spark介绍

     

     

    二 环境

    部署spark

     

    <![if !supportLists]>1、<![endif]>环境准备
    1)配套软件版本要求:

    Java 6+ 

    Python 2.6+. 

    Scala version (2.10.x).


    2)安装好linuxjdkpython, 一般linux均会自带安装好jdkpython但注意jdk默认为openjdk,建议重新安装oracle jdk


    3IP10.171.29.191  hostnamemaster


    2、安装scala


    1)下载scala
    wget http://downloads.typesafe.com/scala/2.10.5/scala-2.10.5.tgz

    2)解压文件
    tar -zxvf scala-2.10.5.tgz

    3)配置环境变量
    #vi/etc/profile
    #SCALA VARIABLES START
    export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5
    export PATH=$PATH:$SCALA_HOME/bin
    #SCALA VARIABLES END

    $ source /etc/profile
    $ scala -version
    Scala code runner version 2.10.5 -- Copyright 2002-2013, LAMP/EPFL

    4)验证scala
    $ scala
    Welcome to Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51).
    Type in expressions to have them evaluated.
    Type :help for more information.

    scala> 9*9
    res0: Int = 81

    3、安装spark
    1)下载spark
    wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz

    2)解压spark
    tar -zxvf http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz

    3)配置环境变量
    #vi/etc/profile
    #SPARK VARIABLES START 
    export SPARK_HOME=/mnt/jediael/spark-1.3.1-bin-hadoop2.6
    export PATH=$PATH:$SPARK_HOME/bin 
    #SPARK VARIABLES END

    $ source /etc/profile

    4)配置spark
     $ pwd
    /mnt/jediael/spark-1.3.1-bin-hadoop2.6/conf

    $ mv spark-env.sh.template spark-env.sh
    $vi spark-env.sh
    export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5
    export JAVA_HOME=/usr/java/jdk1.7.0_51
    export SPARK_MASTER_IP=10.171.29.191
    export SPARK_WORKER_MEMORY=512m 
    export master=spark://10.171.29.191:7070

    $vi slaves
    localhost

    5)启动spark
    pwd
    /mnt/jediael/spark-1.3.1-bin-hadoop2.6/sbin
    $ ./start-all.sh 
    注意,hadoop也有start-all.sh脚本,因此必须进入具体目录执行脚本

    $ jps
    30302 Worker
    30859 Jps
    30172 Master

    4、验证安装情况
    1)运行自带示例
    $ bin/run-example  org.apache.spark.examples.SparkPi

    2)查看集群环境
    http://master:8080/

    3)进入spark-shell
    $spark-shell

    4)查看jobs等信息
    http://master:4040/jobs/

     

     

    部署开发环境

     

      下载安装ScalaI IDEScala IDE

      

    三 示例入门

      1 建议查看借鉴 spark安装目录地下的examples目录

     

    四 爬过的坑

     

    1 开启spark服务时,报错 

    #localhost port 22: Connection refused

      

    解决:是因为没有安装openssh-server,输入命令 sudo apt-get install openssh-server安装之后,即可解决

     

    2 在eclipse上建立的spark项目,无法运行,报错:错误: 找不到或无法加载主类 

      问题出现条件,当在项目中添加spark的jar包时,就会出现项目报错。

      

       解决:右键工程 ----> Properties --->Scala Compiler

      1 勾选 Use Project Settings

      2 在Scala Installation 下拉框选择一个能用的Scala版本,点击应用即可解决

     

     

     

     

     

     

     

     

     

    3 RDD 转换成为DataFrame

     

      

         val conf = new SparkConf()

        conf.setAppName("SparkSQL")

        conf.setMaster("local")

        

        val sc = new SparkContext(conf)

        val sq = new SQLContext(sc)

        import sq.implicits._ // 加上这句话,才能隐式的转换

        

        val people = sc.textFile("/wgx-linux/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

        people.registerTempTable("people")

        val result = sq.sql("select * from people")

     

       

    4 RDD 操作分为transformation与action

     

    在查询数据时,无论先前是何种变换,都是transformation直到去取结果时,才带有执行!若在没取结果前,计算效率时间,是错误的!

    五 基础概念

    HDFS

     

     

     

     

     

     

     

      一个Namenode节点管理成千上万个DatanodesNamenode相当于资源管理器,Datanodes相当于数据资源。

    一个文件分块存储到不同的Datanodes,每个块都会有副本。

     

    MapReduce

     

      假设有这么一个任务,需要计算出一个大文件存储的最大的数据,下图给出了mapReduce的计算过程。

       

     

    RDD

     

     

     

       RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。而在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

     

      例如map操作会返回MappedRDD,而flatMap则返回FlatMappedRDD。当我们执行map或flatMap操作时,不过是将当前RDD对象传递给对应的RDD对象而已transformation

     

     

    DDL

     

     

     

      数据库模式定义语言DDL(Data Definition Language),是用于描述数据库中要存储的现实世界实体的语言。一个数据库模式包含该数据库中所有实体的描述定义。

     

     

    六  spark +hbase

     

     

     1 在Spark-evn.sh里添加hbase的库(否则会报错误)

    SPARK_CLASSPATH=/home/victor/software/hbase/lib/*

     

    2 hbase数据结构

      

     

    3 连接数据库

        val sqconf = HBaseConfiguration.create()  

        sqconf.set("hbase.zookeeper.property.clientPort""2181")

        sqconf.set("hbase.zookeeper.quorum""localhost")

        val admin = new HBaseAdmin(sqconf)

     

     

    4 增删改查

    val sqconf = HBaseConfiguration.create()  

         sqconf.set("hbase.zookeeper.property.clientPort""2181")

         sqconf.set("hbase.zookeeper.quorum""localhost")

         val admin = new HBaseAdmin(sqconf)

        

        

        // 建表

         if (!admin.isTableAvailable("test-user")) {//检查表是否存在  

          print("Table Not Exists! Create Table")  

          val tableDesc = new HTableDescriptor("test-user")  //表名

          tableDesc.addFamily(new HColumnDescriptor("name".getBytes()))//添加列簇

          tableDesc.addFamily(new HColumnDescriptor("id".getBytes()))//添加列簇

          admin.createTable(tableDesc)//建表

          

          println("create table test-user")

          

          

         }

        

        // 增

        val table = new HTable(sqconf"test-user");  

        for (i <- 1 to 10) {  

          var put = new Put(Bytes.toBytes("row"+i)) 

          put.add(Bytes.toBytes("name"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))//往列簇basic添加字段name值为value

          put.add(Bytes.toBytes("name"), Bytes.toBytes("xing"), Bytes.toBytesi))

          put.add(Bytes.toBytes("id"), Bytes.toBytes("id"), Bytes.toBytesi))

          table.put(put

        }  

        table.flushCommits()

        

        // 删

        val delete = new Delete(Bytes.toBytes("row0"))//删除row1数据

        table.delete(delete)

        table.flushCommits()

        // 改

        var put = new Put(Bytes.toBytes("row1")) 

        put.add(Bytes.toBytes("id"), Bytes.toBytes("id"), Bytes.toBytes"10001100"))

        table.put(put

        table.flushCommits()

        

        // 查

         val row1 =  new Get(Bytes.toBytes("row1"))

        val HBaseRow = table.get(row1)//获取row为scutshuxue的数据

        if(HBaseRow != null && !HBaseRow.isEmpty){

          var result:AnyRef = null

          result = Bytes.toString(HBaseRow.getValue(Bytes.toBytes("id"), Bytes.toBytes("id")))//得到列簇为address属性city的值

          println("result="+result)

    }

     

     

    5 带条件的查询

       import CompareFilter._

        // 带条件的查询

        // 查询列族为id,列为id.值为3的数据行

        val filter  = new SingleColumnValueFilter(Bytes.toBytes("id"), Bytes.toBytes("id"),  

                        CompareOp.EQUAL, Bytes.toBytes("10001100")); 

        

        val scan = new Scan()

        scan.setFilter(filter)

        

        val scanner = table.getScanner(scan)

        var rsu = scanner.next()

        while(rsu != null){

          

          println("rowkey = " + rsu.getRow()); 

          println("value = " + rsu.getValueAsByteBuffer(Bytes.toBytes("id"), Bytes.toBytes("id"))); 

          rsu = scanner.next()

    }

     

     

      

     

    6 其他操作(group order 等)

      需要一个转换,将hbase 转换成RDD, 在转化成DataFrame,注册数据表,执行sql

    sqconf.set(TableInputFormat.INPUT_TABLE"test-user")

        val usersRDD = sc.newAPIHadoopRDD(sqconfclassOf[TableInputFormat],

          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

          classOf[org.apache.hadoop.hbase.client.Result])

          

          val personRDD = usersRDD.map(p => Peoson(p._2.getValue(Bytes.toBytes("name"),Bytes.toBytes("name")) +"",

              p._2.getValue(Bytes.toBytes("xing"),Bytes.toBytes("name"))+"",p._2.getValue(Bytes.toBytes("id"),Bytes.toBytes("id"))+""))

        

         if(personRDD != null){

           println("suc")

           println(personRDD.count())

         }

         else

           println("ERROR")

           

        println("to DataFrame suc")

        

        import sq.implicits._ 

        val people = personRDD.toDF()

        people.registerTempTable("people")

        

        val result = sq.sql("select * from people")

        

        result.map(t => "Name: " + t(0)).collect().foreach(println)

     

     

    七 spark +hive 

       安装spark之后,自带有hive,所以不需要另外部署hive。

     

    1 特点

     

      Hive不支持常规的SQL更新语句,如:数据插入,更新,删除。

      Hive 数据查询用时以数分钟甚至数小时来进行计算即非即时性

      Hive 支持类sql的语法,即hql

     

    2 hive导入数据

     

      def addDataToTable(sq:HiveContext){

        sq.sql("load data local inpath '/usr/wgx/test_user.txt' into table test_user1")

        sq.sql("load data local inpath '/usr/wgx/test_user_info.txt' into table test_user_info1")

      }

     

     

       

     

    八 spark +mysql

     

    九 调研分析

     

    结果分析

    注:1 order by 为随机数排序

     

                                           (单位:秒)

    方式

    order by

     group

    sum group

    left join

    数量

    spark+hbase

    46.727

    15.490

    13.320

    45.943

    100

    spark+hive 

    11.209

    4.490

    2.814

    6.247

    100

    spark +mysql

     

     

     

     

    100

     

    续                                     (单位:秒)

    方式

    =

    <>

    <

    >

    数量

    spark+hbase

    11.682

    29.947

    10.484

    10.849

    100

    spark+hive 

    0.694

    0.826

    0.900

    0.941

    100

    spark +mysql

     

     

     

     

    100

     

       

                                          (单位:秒)

    方式

    order by

     group

    sum group

    left join

    数量

    spark+hbase

    72.114

    25.347

    23.190

    66.321

    200

    spark+hive 

    12.009

    5.026

    3.385

    8.620

    200

    spark +mysql

     

     

     

     

    200

     

    续                                   (单位:秒)

    方式

    =

    <>

    <

    >

    数量

    spark+hbase

    20.073

    20.134

    19.883

    20.679

    200

    spark+hive 

    1.33

    1.564

    1.359

    1.355

    200

    spark +mysql

     

     

     

     

    200

     

     

                   

    线性分析

     

     

              线性分析(spark +hbase

          (单位:秒)

     

    100

    200

    600

    order by

    46.727

    72.114

    138.561

     group

    15.490

    25.347

    51.306

    sum group

    13.320

    23.190

    50.565

    left join

    45.943

    66.321

    281.226 

    [join1700万】

    =

    11.682

    20.073

    46.582

    <>

    29.947

    20.134

    51.231

    <

    10.484

    19.883

    45.515

    >

    10.849

    20.679

    47.663

     

              线性分析(spark +hive

                   (单位:秒)

     

    100

    200

    600

    order by

    11.209

    12.009

     

     group

    4.490

    5.026

     

    sum group

    2.814

    3.385

     

    left join

    6.247

    8.620

     

    =

    0.694

    1.331

     

    <>

    0.826

    1.564

     

    <

    0.900

    1.359

     

    >

    0.941

    1.355

     

     

     

     

    十 hive hbase整合

     

    整合方式一 sparkSQL 通过hive查询hbase数据

     

    原理:

     1 读取hbase表,转换成RDD

     2 将RDD转换成对象模型RDD

     3 对象模型RDD注册成虚拟临时表

     4 从第三步的虚拟临时表的数据导入hive表

     5 读取hive表为RDD

     6 再将第五步的RDD注册临时表

     7 查询

     

    转换效率

     

    注:从hbase转换成hive (单位:秒)

    数量

    200

    400

    600

    时间

    97.513

    144.007

    226.221

     

    查询效率线性分析(单位:秒)

     

    200

    400

    600

    order by

    12.748

    26.016

    56.231

     group

    5.114

    7.871

    21.625

    sum group

    3.765

    5.869

    9.379

    left join

    10.935

    34.467

    31.471

    =

    2.041

    7.298

    5.727

    <>

    2.662

    5.534

    8.502

    <

    1.907

    4.115

    5.499

    >

    2.120

    4.049

    5.644

     

     

    2 整合方式二  以hive sql方式查询hbase

     

    原理:

    spark 将hbase表转换成RDD (模型转换,并没执行)

    RDD 通过hive Context 注册为临时表

    hive 执行查询

     

    整合效率

     

    200

    400

    600

    order by

     

     

    148.701

     group

     

     

    78.277

    sum group

     

     

    53.201

    left join

     

     

    314.468 [1]

    =

     

     

    46.615

    <>

     

     

    53.453

    <

     

     

    46.097

    >

     

     

    46.845

     

    :600万的表与1700万数据表的join

     

    整合方式三 hive表外部关联hbase

     

      原理:

     

      在创建hive表时,在创建表的sql上加上对hbase表的关联

      

    sql("CREATE EXTERNAL TABLE user_t_info(id string,userId string,name string,phone string)"+

     

           "STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'"+

     

    "WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,id:id,info:name,info:phone ")"+

     

    "TBLPROPERTIES("hbase.table.name" = "user_t_info")")

     

       

    整合效率

     

     

    200

    400

    600

    order by

     

     

    240.608

     group

     

     

    88.908

    sum group

     

     

    86.667

    left join

     

     

     

    =

     

     

    79.768

    <>

     

     

    80.462

    <

     

     

    80.645

    >

     

     

    79.237

     

     

    十一 结论分析

    1 spark +hive 比spqrk+hbase 效率高

     

    2 随着数据量的增加,spark+hive 没有成线性增加,spark+hbase大致成线性关系增加,总体上,spark+hive的增加幅度较小

     

    3 shpark +hbase +hive 

      从hbase转换成hive,数据量和时间大致成线性关系,比纯线性关系好一点

      查询效率来讲,随着数据量的增加,虽然时间有所增加,但幅度不大

     

    十二 附录

    测试源码

     

     

     

    极点科技

    诚信 专注 创新

  • 相关阅读:
    【Android Developers Training】 73. 布局变化的动画
    【Android Developers Training】 72. 缩放一个视图
    【Android Developers Training】 71. 显示翻牌动画
    svn更改地址怎么办
    python学习手册
    failed to bind pixmap to texture
    Ubuntu 12.04安装Google Chrome
    svn update 时总是提示 Password for '默认密钥' GNOME keyring: 输入密码
    重设SVN 的GNOME keyring [(null)] 的密码
    Nginx + uWSGI + web.py 搭建示例
  • 原文地址:https://www.cnblogs.com/JDtech/p/5320408.html
Copyright © 2011-2022 走看看