zoukankan      html  css  js  c++  java
  • 使用MapReduce将HDFS数据导入到HBase(三)

    使用MapReduce生成HFile文件,通过BulkLoader方式(跳过WAL验证)批量加载到HBase表中

    package com.mengyao.bigdata.hbase;
    
    import java.io.IOException;
    
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 
     * @author mengyao
    * HBase-1.0.1.1、Hadoop-2.6.0 *
    */ public class BulkLoadApp { private static Configuration conf = HBaseConfiguration.create(); private static String inPath; private static String outPath; private static String tableName; static { conf.set("hbase.zookeeper.quorum", "bdata200,bdata202,bdata203"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private ImmutableBytesWritable row; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //id,username,email,birthday,mobile,phone,modified String[] fields = line.split(" "); String id = fields[0]; String username = fields[1]; String mail = fields[2]; String birthday = fields[3]; String mobile = fields[4]; String phone = fields[5]; String regtime = fields[6]; String rowKey = DigestUtils.md5Hex(id); row = new ImmutableBytesWritable(Bytes.toBytes(rowKey)); Put put = new Put(Bytes.toBytes(rowKey), System.currentTimeMillis()); if (!StringUtils.isEmpty(id)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(id)); } if (!StringUtils.isEmpty(username)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("username"), Bytes.toBytes(username)); } if (!StringUtils.isEmpty(mail)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mail"), Bytes.toBytes(mail)); } if (!StringUtils.isEmpty(birthday) || !birthday.equals("0000-00-00")) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes(birthday)); } if (!StringUtils.isEmpty(mobile)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile"), Bytes.toBytes(mobile)); } if (!StringUtils.isEmpty(phone)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone)); } if (!StringUtils.isEmpty(regtime)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("modified"), Bytes.toBytes(regtime)); } context.write(row, put); } } static int createJob(String[] args) throws Exception { inPath = args[0]; outPath = args[1]; tableName = args[2]; Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Job job=Job.getInstance(conf); job.setJarByClass(BulkLoadApp.class); job.setMapperClass(BulkLoadMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf(tableName))); FileInputFormat.addInputPath(job,new Path(inPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); return job.waitForCompletion(true)?0:1; } /** * use commond: * 1、hadoop jar MyJar INPUT_FILE OUTPUT_DIR TABLE_NAME * hadoop jar bigdata.jar /tag/data/user/haier_user.csv /tag/data/user/haier_user_out tbl_shopuser * 2、hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles OUTPUT_DIR TABLE_NAME * hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tag/data/user/haier_user_out tbl_shopuser * @param args * @throws Exception */ @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { if (args.length!=3) { System.out.println("Usage: "+BulkLoadApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH> <TABLE_NAME>"); } else { int status = createJob(args); if (status == 0) { LoadIncrementalHFiles loadHFiles = new LoadIncrementalHFiles(conf); loadHFiles.doBulkLoad(new Path(outPath), new HTable(conf, Bytes.toBytes(tableName))); } System.exit(status); } } }
  • 相关阅读:
    50多条mysql数据库优化建议
    反向代理|与正向代理区别区别
    CDN技术
    mysql存储过程对900w数据进行操作测试
    Navicat For Mysql快捷键
    Linux下目标文件分析
    Linux下的两个经典宏定义 转
    debuginfo-install glibc-2.17-157.el7.x86_64
    index merge 引起的死锁分析
    linux内核源码中常见宏定义
  • 原文地址:https://www.cnblogs.com/mengyao/p/6774046.html
Copyright © 2011-2022 走看看