zoukankan      html  css  js  c++  java
  • Hbase Client 使用示例

    这两天在读淘宝开源出来的DataX,想模仿它写一个离线数据交换组件。读了它读写Hbase的插件的源代码,觉得写得确实比我之前写得好。整理出来,放在这里,向优秀代码学习。关键的地方时它在处理异常的时候考虑的比我周全很多。

    先是写Hbase的代码:

    /**
    
     * (C) 2010-2011 Alibaba Group Holding Limited.
    
     *
    
     * This program is free software; you can redistribute it and/or
    
     * modify it under the terms of the GNU General Public License 
    
     * version 2 as published by the Free Software Foundation. 
    
     * 
    
     */
    
    
    
    package com.taobao.datax.plugins.writer.hbasewriter;
    
    
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.hbase.HTableDescriptor;
    
    import org.apache.hadoop.hbase.client.*;
    
    import org.apache.hadoop.hbase.util.Bytes;
    
    
    
    import java.io.IOException;
    
    import java.util.ArrayList;
    
    import java.util.List;
    
    
    
    
    
    public class HBaseProxy {
    
    	private Configuration config;
    
    
    
    	private HTable htable;
    
    
    
    	private HBaseAdmin admin;
    
    
    
    	private HTableDescriptor descriptor;
    
    	
    
    	private static final int BUFFER_LINE = 1024;
    
    	
    
    	private List<Put> buffer = new ArrayList<Put>(BUFFER_LINE);
    
    
    
    	private Put p;
    
    
    
    	public static HBaseProxy newProxy(String hbase_conf, String table) 
    
    			throws IOException {
    
    		return new HBaseProxy(hbase_conf, table);
    
    	}
    
    	
    
    	private HBaseProxy(String hbase_conf, String tableName)
    
    			throws IOException {
    
    		Configuration conf = new Configuration();
    
    		conf.addResource(new Path(hbase_conf));
    
    		config = new Configuration(conf);
    
    		htable = new HTable(config, tableName);
    
    		admin = new HBaseAdmin(config);
    
    		descriptor = htable.getTableDescriptor();
    
    	}
    
    
    
    	public void setBufferSize(int bufferSize) throws IOException {
    
    		this.htable.setWriteBufferSize(bufferSize);
    
    	}
    
    	
    
    	public void setHTable(String tableName) throws IOException {
    
    		this.htable = new HTable(config, tableName);
    
    	}
    
    
    
    	public void setAutoFlush(boolean autoflush) {
    
    		this.htable.setAutoFlush(autoflush);
    
    	}
    
    
    
    	public boolean check() throws IOException {
    
    		if (!admin.isMasterRunning()) {
    
    			throw new IllegalStateException("hbase master is not running!");
    
    		}
    
    		if (!admin.tableExists(htable.getTableName())) {
    
    			throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName())
    
    					+ " is not existed!");
    
    		}
    
    		if (!admin.isTableAvailable(htable.getTableName())) {
    
    			throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName())
    
    					+ " is not available!");
    
    		}
    
    		if (!admin.isTableEnabled(htable.getTableName())) {
    
    			throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName())
    
    					+ " is disable!");
    
    		}
    
    
    
    		return true;
    
    	}
    
    
    
    	public void close() throws IOException {
    
    		htable.close();
    
    	}
    
    
    
    	public void deleteTable() throws IOException {
    
    		Scan s = new Scan();
    
    		ResultScanner scanner = htable.getScanner(s);
    
    		for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
    
    			htable.delete(new Delete(rr.getRow()));
    
    		}
    
    		scanner.close();
    
    	}
    
    
    
    	public void truncateTable() throws IOException {
    
    		admin.disableTable(htable.getTableName());
    
    		admin.deleteTable(htable.getTableName());
    
    		admin.createTable(descriptor);
    
    	}
    
    
    
    	public void flush() throws IOException {
    
    		if (!buffer.isEmpty()) {
    
    			htable.put(buffer);
    
    			buffer.clear();
    
    		}
    
    		htable.flushCommits();
    
    	}
    
    
    
    	public void prepare(byte[] rowKey) {
    
    		this.p = new Put(rowKey);
    
        }
    
    	
    
    	public Put add(byte[] family, byte[] qualifier, byte[] value) {
    
    		return this.p.add(family, qualifier, value);
    
    	}
    
    	
    
    	public void insert() throws IOException {
    
    		buffer.add(this.p);
    
    		if (buffer.size() >= BUFFER_LINE) {
    
    			htable.put(buffer);
    
    			buffer.clear();
    
    		}
    
    	}
    
    }
    

      

  • 相关阅读:
    (转载) mysql中,option是保留字段么?
    (转载)腾讯CMEM的PHP扩展
    (转载)一句简单命令重启nginx
    (转载)四种常见的 POST 提交数据方式
    (转载)完美解决PHP中文乱码问题
    (转载)file_get_contents("php://input")
    (转载)PHP 下 CURL 通过 POST 提交表单失败的原因之一与解决办法
    (转载)php array_merge 和 两数组相加区别
    Immutable-不变模式与不变类
    zookeeper 编程框架 curator
  • 原文地址:https://www.cnblogs.com/qgxiaoguang/p/2921045.html
Copyright © 2011-2022 走看看