zoukankan      html  css  js  c++  java
  • 使用MapReduce将HDFS数据导入到HBase(三)

    使用MapReduce生成HFile文件,通过BulkLoader方式(跳过WAL验证)批量加载到HBase表中

    package com.mengyao.bigdata.hbase;
    
    import java.io.IOException;
    
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 
     * @author mengyao
    * HBase-1.0.1.1、Hadoop-2.6.0 *
    */ public class BulkLoadApp { private static Configuration conf = HBaseConfiguration.create(); private static String inPath; private static String outPath; private static String tableName; static { conf.set("hbase.zookeeper.quorum", "bdata200,bdata202,bdata203"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private ImmutableBytesWritable row; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //id,username,email,birthday,mobile,phone,modified String[] fields = line.split(" "); String id = fields[0]; String username = fields[1]; String mail = fields[2]; String birthday = fields[3]; String mobile = fields[4]; String phone = fields[5]; String regtime = fields[6]; String rowKey = DigestUtils.md5Hex(id); row = new ImmutableBytesWritable(Bytes.toBytes(rowKey)); Put put = new Put(Bytes.toBytes(rowKey), System.currentTimeMillis()); if (!StringUtils.isEmpty(id)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(id)); } if (!StringUtils.isEmpty(username)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("username"), Bytes.toBytes(username)); } if (!StringUtils.isEmpty(mail)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mail"), Bytes.toBytes(mail)); } if (!StringUtils.isEmpty(birthday) || !birthday.equals("0000-00-00")) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes(birthday)); } if (!StringUtils.isEmpty(mobile)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile"), Bytes.toBytes(mobile)); } if (!StringUtils.isEmpty(phone)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone)); } if (!StringUtils.isEmpty(regtime)) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("modified"), Bytes.toBytes(regtime)); } context.write(row, put); } } static int createJob(String[] args) throws Exception { inPath = args[0]; outPath = args[1]; tableName = args[2]; Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(tableName)); Job job=Job.getInstance(conf); job.setJarByClass(BulkLoadApp.class); job.setMapperClass(BulkLoadMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf(tableName))); FileInputFormat.addInputPath(job,new Path(inPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); return job.waitForCompletion(true)?0:1; } /** * use commond: * 1、hadoop jar MyJar INPUT_FILE OUTPUT_DIR TABLE_NAME * hadoop jar bigdata.jar /tag/data/user/haier_user.csv /tag/data/user/haier_user_out tbl_shopuser * 2、hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles OUTPUT_DIR TABLE_NAME * hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tag/data/user/haier_user_out tbl_shopuser * @param args * @throws Exception */ @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { if (args.length!=3) { System.out.println("Usage: "+BulkLoadApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH> <TABLE_NAME>"); } else { int status = createJob(args); if (status == 0) { LoadIncrementalHFiles loadHFiles = new LoadIncrementalHFiles(conf); loadHFiles.doBulkLoad(new Path(outPath), new HTable(conf, Bytes.toBytes(tableName))); } System.exit(status); } } }
  • 相关阅读:
    vue 当前页跳转并强制刷新
    (转)vue项目刷新当前页面
    查询sqlserver中表信息
    (转) 自旋锁和互斥锁
    Web API 自定义文件内容的定制类
    (转)缓存
    (转) redis的事务和watch
    ASP.NET MVC , ASP.NET Web API 的路由系统与 ASP.NET 的路由系统是怎么衔接的?
    (转) 分布式系统关注点——99%的人都能看懂的「熔断」以及最佳实践
    php项目权限系统设计
  • 原文地址:https://www.cnblogs.com/mengyao/p/6774046.html
Copyright © 2011-2022 走看看