zoukankan      html  css  js  c++  java
  • MapReduce和Spark写入Hbase多表总结

    作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处

    大家都知道用mapreduce或者spark写入已知的hbase中的表时,直接在mapreduce或者spark的driver class中声明如下代码

    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tablename);

    随后mapreduce在mapper或者reducer中直接context写入即可,而spark则是构造好包含Put的PairRDDFunctions后saveAsHadoopDataset即可.

    而经常会碰到一些要求是根据输入数据,处理后需要写入hbase多个表或者表名是未知的,需要按照数据中某个字段来构造表名写入hbase.

    由于表名未知,所以不能设置TableOutputFormat.OUTPUT_TABLE,那么这种要求也容易实现,分别总结mapreduce和spark的实现方法(其实到最后会发现殊途同归)

    一.MapReduce写入Hbase多表

    在MR的main方法中加入如下代码即可

    job.setOutputFormatClass(MultiTableOutputFormat.class);

    随后就可以在mapper或者reducer的context中根据相关字段构造表名和put写入多个hbase表.

    二.Spark写入Hbase多表

    这里直接用我测试过的spark streaming程序写入多个hbase表,上代码

    object SparkStreamingWriteToHbase {
      def main(args: Array[String]): Unit = {
        var masterUrl = "yarn-client"
        if (args.length > 0) {
          masterUrl = args(0)
        }
        val conf = new SparkConf().setAppName("Write to several tables of Hbase").setMaster(masterUrl)
    
        val ssc = new StreamingContext(conf, Seconds(5))
    
        val topics = Set("app_events")
    
        val brokers = PropertiesUtil.getValue("BROKER_ADDRESS")
    
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
    
        val hbaseTableSuffix = "_clickcounts"
    
        val hConf = HBaseConfiguration.create()
        val zookeeper = PropertiesUtil.getValue("ZOOKEEPER_ADDRESS")
        hConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper)
    
        val jobConf = new JobConf(hConf, this.getClass)
    
        val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
        val appUserClicks = kafkaDStreams.flatMap(rdd => {
          val data = JSONObject.fromObject(rdd._2)
          Some(data)
        }).map{jsonLine =>
            val key = jsonLine.getString("appId") + "_" + jsonLine.getString("uid")
            val value = jsonLine.getString("click_count")
            (key, value)
        }
    
        val result = appUserClicks.map { item =>
          val rowKey = item._1
          val value = item._2
          convertToHbasePut(rowKey, value, hbaseTableSuffix)
        }
    
        result.foreachRDD { rdd =>
          rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf)
        }
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      def convertToHbasePut(key: String, value: String, tableNameSuffix: String): (ImmutableBytesWritable, Put) = {
        val rowKey = key
        val tableName = rowKey.split("_")(0) + tableNameSuffix
        val put = new Put(Bytes.toBytes(rowKey))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(value))
        (new ImmutableBytesWritable(Bytes.toBytes(tableName)), put)
      }
    
    }

    简单描述下,这里spark streaming中处理的是从kafka中读取的json数据,其中的appId字段用来构造tablename区分写入不同的hbase table.最后以saveAsNewAPIHadoopFile把rdd写入hbase表

    进入saveAsNewAPIHadoopFile会发现其实和mapreduce的配置没什么区别,如下

    def saveAsNewAPIHadoopFile(
          path: String,
          keyClass: Class[_],
          valueClass: Class[_],
          outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
          conf: Configuration = self.context.hadoopConfiguration)
      {
        // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
        val hadoopConf = conf
        val job = new NewAPIHadoopJob(hadoopConf)
        job.setOutputKeyClass(keyClass)
        job.setOutputValueClass(valueClass)
        job.setOutputFormatClass(outputFormatClass)
        job.getConfiguration.set("mapred.output.dir", path)
        saveAsNewAPIHadoopDataset(job.getConfiguration)
      }

    这个方法的参数分别是ouput path,这里写入hbase,传入为空即可,其他参数outputKeyClass,outputValueClass,outputFormatClass,jobconf

    这里的outputFormatClass确保一定是MultiTableOutputFormat来保证写入多表,对了,这里说明一点,确保你要写入的hbase表首先被create了。

  • 相关阅读:
    文件夹打开对话框
    文件打开对话框
    HOOK函数(二)——全局HOOK
    HOOK函数(一)——进程内HOOK
    抓包
    List 访问
    坑爹的EL 表达式。
    tomcat 虚拟目录的安全问题
    框架
    程序员相关词汇
  • 原文地址:https://www.cnblogs.com/cssdongl/p/6227195.html
Copyright © 2011-2022 走看看