zoukankan      html  css  js  c++  java
  • hbase1

    http://blog.csdn.net/anhuidelinger/article/details/16989771

    import java.io.IOException;  
    import java.util.HashMap;  
      
    import org.apache.hadoop.conf.Configuration;  
    import org.apache.hadoop.hbase.HBaseConfiguration;  
    import org.apache.hadoop.hbase.client.Put;  
    import org.apache.hadoop.hbase.client.Result;  
    import org.apache.hadoop.hbase.client.Scan;  
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
    import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;  
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;  
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
    import org.apache.hadoop.hbase.util.Base64;  
    import org.apache.hadoop.hbase.util.Bytes;  
    import org.apache.hadoop.io.Writable;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.util.GenericOptionsParser;  
    
    public class IndexBuilder {
    	// 索引的列族为info ,列为name
    	public static final byte[] column = Bytes.toBytes("info");
    	public static final byte[] qualifier = Bytes.toBytes("name");
    	
    	
    	public static class Map extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
    		private byte[] family;
    		// indexes 存储了列与表对对应关系,其中byte[]用于获取列的值作为索引表的键值,ImmutableBytesWritable作为表的名称
    		private HashMap<byte[], ImmutableBytesWritable> indexes;
    		// 在Map中,对每一行的数据提取出需要建立索引的列的值,加入到索引表中输出。
    		protected void map(ImmutableBytesWritable rowKey, Result result,Context context) throws IOException, InterruptedException {
    			// original: row 123 attribute:phone 555-1212
    	        // index: row 555-1212 INDEX:ROW 123
    			for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
    				// 获得列名 qualifier
    				byte[] qualifier = index.getKey();
    				// 索引表名 table
    				ImmutableBytesWritable table = index.getValue();
    				// 插入的列值为 列名加上行名
    				byte[] value = result.getValue(family, qualifier);
    				// 以列值作为行键,在列“info:row”中插入行键
    				Put put = new Put(value);
    				put.add(column, qualifier, rowKey.get());
    				// 在table表上执行put操作
    				context.write(table, (Writable) put);
    			}
    		}
    
    
    		protected void setup(Context context) throws IOException,InterruptedException {
    			Configuration configuration = context.getConfiguration();
    			String table = configuration.get("index.tablename");
    			String[] fields = configuration.getStrings("index.fields");
    			String familyName = configuration.get("index.familyname");
    			family = Bytes.toBytes(familyName);
    
    			// 初始化indexes   
    			// if the table is "people" and the field to index is "email", then the
    	        // index table will be called "people-email"
    			indexes = new HashMap<byte[], ImmutableBytesWritable>();
    			for (String field : fields) {
    				indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(
    						Bytes.toBytes(table + "-" + field)));
    			}
    		}
    	}
    	// Job configuration
    	public static Job configureJob(Configuration conf, String[] args)throws IOException {
    		String table = args[0];
    		String columnFamily = args[1];
    		System.out.println(table);
    		// 通过Configuration.set()方法传递参数
    		conf.set(TableInputFormat.SCAN, ScanToString(new Scan()));
    		conf.set(TableInputFormat.INPUT_TABLE, table);
    		conf.set("index.tablename", table);
    		conf.set("index.familyname", columnFamily);
    		String[] fields = new String[args.length - 2];
    		for (int i = 0; i < fields.length; i++) {
    			fields[i] = args[i + 2];
    		}
    		conf.setStrings("index.fields", fields);
    		conf.set("index.familyname", "attributes");
    		// 运行参数配置
    		Job job = new Job(conf, table);
    		job.setJarByClass(IndexBuilder.class);
    		job.setMapperClass(Map.class);
    		job.setNumReduceTasks(0);
    		job.setInputFormatClass(TableInputFormat.class);
    		job.setOutputFormatClass(MultiTableOutputFormat.class);
    		return job;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = HBaseConfiguration.create();
    		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (otherArgs.length < 3)System.exit(-1);
    		Job job = configureJob(conf, otherArgs);
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    	static String ScanToString(Scan scan) throws IOException {
    	    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
    	    return Base64.encodeBytes(proto.toByteArray());
    	}
    }
    

      

  • 相关阅读:
    eclipse中文乱码问题解决方案
    修改Tomcat的JDK目录
    Tomcat 5.5 修改服务器的侦听端口
    HTML DOM教程 27HTML DOM Button 对象
    HTML DOM教程 24HTML DOM Frameset 对象
    Navicat for MySQL v8.0.27 的注册码
    HTML DOM教程 25HTML DOM IFrame 对象
    Tomcat 5.5 的下载和安装
    android manifest相关属性
    ubuntu10.04 下 eclipse 小结
  • 原文地址:https://www.cnblogs.com/shenbingyu/p/5025968.html
Copyright © 2011-2022 走看看