zoukankan      html  css  js  c++  java
  • 通过生成HFile导入HBase

    要实现DataFrame通过HFile导入HBase有两个关键步骤

    第一个是要生成Hfile
    第二个是HFile导入HBase

    测试DataFrame数据来自mysql,如果对读取mysql作为DataFrame不熟悉的人可以参考 Spark:读取mysql数据作为DataFrame
    当然也可以自己决定DataFrame的数据来源,此处以Mysql为例

    1.mysql的信息

    mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

    复制代码
    1 //配置文件示例:
    2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 
    3 #mysql数据库配置
    4 mysql.driver=com.mysql.jdbc.Driver
    5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
    6 mysql.username=user
    7 mysql.password=123456
    复制代码

    2.需要的jar依赖

    sbt版本,maven的对应修改即可

    复制代码
     1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
     2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
     3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
     4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
     5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
     6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
     7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
     8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
     9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
    10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0" 
    复制代码

    3. 完整代码

    复制代码
      1 import java.io.FileInputStream
      2 import java.util.Properties
      3 
      4 import org.apache.hadoop.conf.Configuration
      5 import org.apache.hadoop.fs.{FileSystem, Path}
      6 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
      7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
      8 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
      9 import org.apache.hadoop.hbase.util.Bytes
     10 import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}
     11 import org.apache.hadoop.mapreduce.Job
     12 import org.apache.spark.rdd.RDD
     13 import org.apache.spark.sql.functions.{concat, lit}
     14 import org.apache.spark.sql.hive.HiveContext
     15 import org.apache.spark.sql.{DataFrame, SQLContext}
     16 import org.apache.spark.{SparkConf, SparkContext}
     17 
     18 /**
     19   * @author 利伊奥克儿-lillcol
     20   *         2018/10/14-11:08
     21   *
     22   */
     23 object TestHFile {
     24   var hdfsPath: String = ""
     25   var proPath: String = ""
     26   var DATE: String = ""
     27 
     28   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
     29   val sc: SparkContext = new SparkContext(sparkConf)
     30   val sqlContext: SQLContext = new HiveContext(sc)
     31 
     32   import sqlContext.implicits._
     33 
     34   def main(args: Array[String]): Unit = {
     35     hdfsPath = args(0)
     36     proPath = args(1)
     37 
     38     //HFile保存路径
     39     val save_path: String = hdfsPath + "TableTestHFile"
     40     //获取测试DataFrame
     41     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
     42 
     43     val resultDataFrame: DataFrame = dim_sys_city_dict
     44       .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*")
     45     //注:resultDataFrame 里面的 key 要放在第一位,因为后面需要对字段名排序
     46     saveASHfFile(resultDataFrame, "cf_info", save_path)
     47   }
     48 
     49   /**
     50     * 将DataFrame 保存为 HFile
     51     *
     52     * @param resultDataFrame 需要保存为HFile的 DataFrame,DataFrame的第一个字段必须为"key"
     53     * @param clounmFamily    列族名称(必须在Hbase中存在,否则在load数据的时候会失败)
     54     * @param save_path       HFile的保存路径
     55     */
     56   def saveASHfFile(resultDataFrame: DataFrame, clounmFamily: String, save_path: String): Unit = {
     57     val conf: Configuration = HBaseConfiguration.create()
     58     lazy val job = Job.getInstance(conf)
     59     job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //设置MapOutput Key Value 的数据类型
     60     job.setMapOutputValueClass(classOf[KeyValue])
     61 
     62     var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key
     63     columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序
     64 
     65     val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame
     66       .map(row => {
     67         var kvlist: Seq[KeyValue] = List()
     68         var rowkey: Array[Byte] = null
     69         var cn: Array[Byte] = null
     70         var v: Array[Byte] = null
     71         var kv: KeyValue = null
     72         val cf: Array[Byte] = clounmFamily.getBytes //列族
     73         rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
     74         for (i <- 1 to (columnsName.length - 1)) {
     75           cn = columnsName(i).getBytes() //列的名称
     76           v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值
     77           //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
     78           kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value
     79           //
     80           kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)
     81         }
     82         (new ImmutableBytesWritable(rowkey), kvlist)
     83       })
     84 
     85     //RDD[(ImmutableBytesWritable, Seq[KeyValue])] 转换成 RDD[(ImmutableBytesWritable, KeyValue)]
     86     val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {
     87       s.iterator
     88     })
     89 
     90     delete_hdfspath(save_path) //删除save_path 原来的数据
     91     //保存数据
     92     result
     93       .sortBy(x => x._1, true) //要保持 整体有序
     94       .saveAsNewAPIHadoopFile(save_path,
     95       classOf[ImmutableBytesWritable],
     96       classOf[KeyValue],
     97       classOf[HFileOutputFormat2],
     98       job.getConfiguration)
     99 
    100   }
    101 
    102   /**
    103     * 删除hdfs下的文件
    104     *
    105     * @param url 需要删除的路径
    106     */
    107   def delete_hdfspath(url: String) {
    108     val hdfs: FileSystem = FileSystem.get(new Configuration)
    109     val path: Path = new Path(url)
    110     if (hdfs.exists(path)) {
    111       val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
    112       hdfs.delete(path, true)
    113     }
    114   }
    115 
    116   /**
    117     * 获取 Mysql 表的数据
    118     *
    119     * @param sqlContext
    120     * @param tableName 读取Mysql表的名字
    121     * @param proPath   配置文件的路径
    122     * @return 返回 Mysql 表的 DataFrame
    123     */
    124   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
    125     val properties: Properties = getProPerties(proPath)
    126     sqlContext
    127       .read
    128       .format("jdbc")
    129       .option("url", properties.getProperty("mysql.url"))
    130       .option("driver", properties.getProperty("mysql.driver"))
    131       .option("user", properties.getProperty("mysql.username"))
    132       .option("password", properties.getProperty("mysql.password"))
    133       //        .option("dbtable", tableName.toUpperCase)
    134       .option("dbtable", tableName)
    135       .load()
    136 
    137   }
    138 
    139   /**
    140     * 获取 Mysql 表的数据 添加过滤条件
    141     *
    142     * @param sqlContext
    143     * @param table           读取Mysql表的名字
    144     * @param filterCondition 过滤条件
    145     * @param proPath         配置文件的路径
    146     * @return 返回 Mysql 表的 DataFrame
    147     */
    148   def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String): DataFrame = {
    149     val properties: Properties = getProPerties(proPath)
    150     var tableName = ""
    151     tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
    152     sqlContext
    153       .read
    154       .format("jdbc")
    155       .option("url", properties.getProperty("mysql.url"))
    156       .option("driver", properties.getProperty("mysql.driver"))
    157       .option("user", properties.getProperty("mysql.username"))
    158       .option("password", properties.getProperty("mysql.password"))
    159       .option("dbtable", tableName)
    160       .load()
    161   }
    162 
    163   /**
    164     * 获取配置文件
    165     *
    166     * @param proPath
    167     * @return
    168     */
    169   def getProPerties(proPath: String): Properties = {
    170     val properties: Properties = new Properties()
    171     properties.load(new FileInputStream(proPath))
    172     properties
    173   }
    174 }
    复制代码

    4. 测试代码

    复制代码
     1 def main(args: Array[String]): Unit = {
     2     hdfsPath = args(0)
     3     proPath = args(1)
     4 
     5     //HFile保存路径
     6     val save_path: String = hdfsPath + "TableTestHFile"
     7     //获取测试DataFrame
     8     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
     9 
    10     val resultDataFrame: DataFrame = dim_sys_city_dict
    11       .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*")
    12     //注:resultDataFrame 里面的 key 要放在第一位,因为后面需要对字段名排序
    13     saveASHfFile(resultDataFrame, "cf_info", save_path)
    14   }
    复制代码

     5. 执行命令

    复制代码
     1 nohup spark-submit --master yarn 
     2 --driver-memory 4G 
     3 --num-executors 2 
     4 --executor-cores 4 
     5 --executor-memory 8G 
     6 --class com.iptv.job.basedata.TestHFile 
     7 --jars /var/lib/hadoop-hdfs/tmp_lillcol/mysql-connector-java-5.1.38.jar 
     8 tygq.jar 
     9 hdfs://ns1/user/hive/warehouse/ 
    10 /var/lib/hadoop-hdfs/tmp_lillcol/job.properties > ./TestHFile.log 2>&1 &
    复制代码

    6.执行结果

    1 [hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/TableTestHFile
    2 0       0       hdfs://ns1/user/hive/warehouse/TableTestHFile/_SUCCESS
    3 12.3 K  24.5 K  hdfs://ns1/user/hive/warehouse/TableTestHFile/cf_info

     7. HFile load 进 Hbase

    复制代码
    1 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/TableTestHFile iptv:spark_test
    2 
    3 .....
    4 18/10/17 10:14:20 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/TableTestHFile/cf_info/fdc37dc6811140dfa852ac71b00b33aa first=200_xE5xB9xBFxE5xB7x9E_GD_GZ last=769_xE4xB8x9CxE8x8Ex9E_GD_DG
    5 18/10/17 10:14:20 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
    6 18/10/17 10:14:20 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x16604bba6872fff
    7 18/10/17 10:14:20 INFO zookeeper.ClientCnxn: EventThread shut down
    8 18/10/17 10:14:20 INFO zookeeper.ZooKeeper: Session: 0x16604bba6872fff closed
    复制代码

    8.查看HBase中的数据

    复制代码
     1 hbase(main):005:0> scan 'iptv:spark_test',{LIMIT=>2}
     2 ROW                                                          COLUMN+CELL                                                                                                                                                                     
     3  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:bureau_id, timestamp=1539742949840, value=BF55                                                                                    
     4  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:bureau_name, timestamp=1539742949840, value=x85xACxE5                              
     5  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:city_code, timestamp=1539742949840, value=112                                                                                                                  
     6  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:city_id, timestamp=1539742949840, value=112                                                                                                                      
     7  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:city_name, timestamp=1539742949840, value=xB7x9E                                                                                               
     8  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:dict_id, timestamp=1539742949840, value=112                                                                                                                       
     9  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:group_id, timestamp=1539742949840, value=112                                                                                                                     
    10  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:group_name, timestamp=1539742949840, value=x8CxBA                                                                      
    11  200_xE5xB9xBFxE5xB7x9E_GD_GZ                          column=cf_info:sort, timestamp=1539742949840, value=112                                                                                                                           
    12  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:bureau_id, timestamp=1539742949840, value=6AA0EF0B                                                                                       
    13  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:bureau_name, timestamp=1539742949840, value=xE5x8FxB8                                 
    14  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:city_code, timestamp=1539742949840, value=112                                                                                                                  
    15  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:city_id, timestamp=1539742949840, value=112                                                                                                                      
    16  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:city_name, timestamp=1539742949840, value=xBE                                                                                               
    17  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:dict_id, timestamp=1539742949840, value=112                                                                                                                       
    18  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:group_id, timestamp=1539742949840, value=112                                                                                                                     
    19  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:group_name, timestamp=1539742949840, value=x8CxBA                                                                      
    20  660_xE6xB1x95xE5xB0xBE_GD_SW                          column=cf_info:sort, timestamp=1539742949840, value=112  
    复制代码

     9.总结

    多列族,多列处理

    通过算法将原本只能单个一个列族一个列处理的数据扩展到了多列族,多列处理。
    实现的关键是下面的两段代码

    复制代码
     1 var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key
     2     columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序
     3 
     4     val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame
     5       .map(row => {
     6         var kvlist: Seq[KeyValue] = List()
     7         var rowkey: Array[Byte] = null
     8         var cn: Array[Byte] = null
     9         var v: Array[Byte] = null
    10         var kv: KeyValue = null
    11         val cf: Array[Byte] = clounmFamily.getBytes //列族
    12         rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
    13         for (i <- 1 to (columnsName.length - 1)) {
    14           cn = columnsName(i).getBytes() //列的名称
    15           v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值
    16           //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
    17           kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value
    18           //
    19           kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)
    20         }
    21         (new ImmutableBytesWritable(rowkey), kvlist)
    22       })
    23 
    24     //RDD[(ImmutableBytesWritable, Seq[KeyValue])] 转换成 RDD[(ImmutableBytesWritable, KeyValue)]
    25     val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {
    26       s.iterator
    27     })
    复制代码

    DataFrame的优势就是它算是一个结构化数据,我们很容易对里面的每一个字段进行处理

    • 通过resultDataFrame.columns获取所有列名,通过drop(1)删掉“key”,(序号从1开始)
    • 通过sorted 对列名进行排序,默认就是升序的,如果不排序会报错,具体错误后面展示
    •  然后通过map取出每一行一行数据,再通过for对每一个字段处理,每处理一个字段相关信息加入List,得到 RDD[(ImmutableBytesWritable, Seq[KeyValue])]
    • 通过flatMapValues将RDD[(ImmutableBytesWritable, Seq[KeyValue])] 转换成 RDD[(ImmutableBytesWritable, KeyValue)]

    通过上述处理,我们将得到RDD[(ImmutableBytesWritable, KeyValue)]类型的数据,就可以直接使用saveAsNewAPIHadoopFile这个方法了

    排序

    此处有两个地方进行了排序

    • rowkey

    这个就不用说了,这个必须要整体有序,实现代码

    复制代码
    1 //保存数据
    2     result
    3       .sortBy(x => x._1, true) //要保持 整体有序
    4       .saveAsNewAPIHadoopFile(save_path,
    5       classOf[ImmutableBytesWritable],
    6       classOf[KeyValue],
    7       classOf[HFileOutputFormat2],
    8       job.getConfiguration)
    复制代码
    • 列名
    1 //列名也要保持整体有序,实现代码
    2 var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key;
    3     columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序

    如果不排序 会出现下面的错误

    1 18/10/15 14:19:32 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 2.0 (TID 3, iptve2e03): java.io.IOException: Added a key not lexically larger than previous. 
    2 Current cell = 200_xE5xB9xBFxE5xB7x9E_GD_GZ/cf_info:area_code/1539584366048/Put/vlen=5/seqid=0, 
    3     lastCell = 200_xE5xB9xBFxE5xB7x9E_GD_GZ/cf_info:dict_id/1539584366048/Put/vlen=2/seqid=0

    上面的意思是当前列名cf_info:area_code比前一个列名cf_info:dict_id小,这就是为什么需要对列名排序的原因,同时还要把key删除掉,因为不删除会出现cf_info:key这个列,这显然是不如何要求的。
    而把key放在第一位也是为了在这个步骤中删除掉key,否则一经排序就很难轻松的删除掉key了

    保存路径

    保存的路径不能存在,那就删除呗

    复制代码
     1 /**
     2     * 删除hdfs下的文件
     3     *
     4     * @param url 需要删除的路径
     5     */
     6   def delete_hdfspath(url: String) {
     7     val hdfs: FileSystem = FileSystem.get(new Configuration)
     8     val path: Path = new Path(url)
     9     if (hdfs.exists(path)) {
    10       val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
    11       hdfs.delete(path, true)
    12     }
    13   }
    复制代码

    列族名称

    列族需要在Hbase中存在,列可以不存在

    对比总结

    Hive-Hbase

    • 优点:

    关联Hive,容易对数据进行二次加工

    操作相对简单,要求没那么高

    可以轻易处理多列族多列问题

    • 缺点:

    建立一张临时表,消耗空间增加一倍左右

    load数据的时候很快,但是insert into的时候耗费时间与数据量相关

    HFile

    • 优点:

    Load数据很快

    从头到尾产生的文件只有一个HFile,必两一种方式节省空间

    • 缺点:

    数据很难二次加工,查询如果没有工具很不友好

     对开发有一定的要求

    转载:https://www.cnblogs.com/lillcol/p/9797061.html

  • 相关阅读:
    课堂练习
    日程管理测试用例
    日程管理APP的测试计划和测试矩阵
    日程管理Bug Report
    图书管理系统活动图
    团队如何做决定?
    课堂练习
    课堂练习(NABCD Model)
    课堂练习
    日程管理的测试计划和测试矩阵
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/12028025.html
Copyright © 2011-2022 走看看