zoukankan      html  css  js  c++  java
  • hbase1

    http://blog.csdn.net/anhuidelinger/article/details/16989771

    import java.io.IOException;  
    import java.util.HashMap;  
      
    import org.apache.hadoop.conf.Configuration;  
    import org.apache.hadoop.hbase.HBaseConfiguration;  
    import org.apache.hadoop.hbase.client.Put;  
    import org.apache.hadoop.hbase.client.Result;  
    import org.apache.hadoop.hbase.client.Scan;  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
    import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;  
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;  
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
    import org.apache.hadoop.hbase.util.Base64;  
    import org.apache.hadoop.hbase.util.Bytes;  
    import org.apache.hadoop.io.Writable;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.util.GenericOptionsParser;  
    
    public class IndexBuilder {
    	// 索引的列族为info ,列为name
    	public static final byte[] column = Bytes.toBytes("info");
    	public static final byte[] qualifier = Bytes.toBytes("name");
    	
    	
    	public static class Map extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
    		private byte[] family;
    		// indexes 存储了列与表对对应关系,其中byte[]用于获取列的值作为索引表的键值,ImmutableBytesWritable作为表的名称
    		private HashMap<byte[], ImmutableBytesWritable> indexes;
    		// 在Map中,对每一行的数据提取出需要建立索引的列的值,加入到索引表中输出。
    		protected void map(ImmutableBytesWritable rowKey, Result result,Context context) throws IOException, InterruptedException {
    			// original: row 123 attribute:phone 555-1212
    	        // index: row 555-1212 INDEX:ROW 123
    			for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
    				// 获得列名 qualifier
    				byte[] qualifier = index.getKey();
    				// 索引表名 table
    				ImmutableBytesWritable table = index.getValue();
    				// 插入的列值为 列名加上行名
    				byte[] value = result.getValue(family, qualifier);
    				// 以列值作为行键,在列“info:row”中插入行键
    				Put put = new Put(value);
    				put.add(column, qualifier, rowKey.get());
    				// 在table表上执行put操作
    				context.write(table, (Writable) put);
    			}
    		}
    
    
    		protected void setup(Context context) throws IOException,InterruptedException {
    			Configuration configuration = context.getConfiguration();
    			String table = configuration.get("index.tablename");
    			String[] fields = configuration.getStrings("index.fields");
    			String familyName = configuration.get("index.familyname");
    			family = Bytes.toBytes(familyName);
    
    			// 初始化indexes   
    			// if the table is "people" and the field to index is "email", then the
    	        // index table will be called "people-email"
    			indexes = new HashMap<byte[], ImmutableBytesWritable>();
    			for (String field : fields) {
    				indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(
    						Bytes.toBytes(table + "-" + field)));
    			}
    		}
    	}
    	// Job configuration
    	public static Job configureJob(Configuration conf, String[] args)throws IOException {
    		String table = args[0];
    		String columnFamily = args[1];
    		System.out.println(table);
    		// 通过Configuration.set()方法传递参数
    		conf.set(TableInputFormat.SCAN, ScanToString(new Scan()));
    		conf.set(TableInputFormat.INPUT_TABLE, table);
    		conf.set("index.tablename", table);
    		conf.set("index.familyname", columnFamily);
    		String[] fields = new String[args.length - 2];
    		for (int i = 0; i < fields.length; i++) {
    			fields[i] = args[i + 2];
    		}
    		conf.setStrings("index.fields", fields);
    		conf.set("index.familyname", "attributes");
    		// 运行参数配置
    		Job job = new Job(conf, table);
    		job.setJarByClass(IndexBuilder.class);
    		job.setMapperClass(Map.class);
    		job.setNumReduceTasks(0);
    		job.setInputFormatClass(TableInputFormat.class);
    		job.setOutputFormatClass(MultiTableOutputFormat.class);
    		return job;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = HBaseConfiguration.create();
    		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (otherArgs.length < 3)System.exit(-1);
    		Job job = configureJob(conf, otherArgs);
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    	static String ScanToString(Scan scan) throws IOException {
    	    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
    	    return Base64.encodeBytes(proto.toByteArray());
    	}
    }
    

      

  • 相关阅读:
    oracle 自增序列实现 可作为主键
    oracle 10 升级补丁
    软件设计原则
    oracle查询截至到当前日期月份所在年份的所有月份
    Dockerfile 基本命令
    Mybatis自动生成代码,MyBatis Generator
    Java垃圾回收机制
    SqlServer 导入 .sql 文件
    备份个清库脚本
    备份一些开发配置
  • 原文地址:https://www.cnblogs.com/shenbingyu/p/5025968.html
Copyright © 2011-2022 走看看