zoukankan      html  css  js  c++  java
  • 【Hbase三】Java,python操作Hbase

    Java,python操作Hbase

    python操作Hbase

    由于Hbase是java开发的,所有如需要用python进行对Hbase的操作就需要借助Thrift等工具让语言透明化

    安装Thrift之前所需准备

    1. wget http://archive.apache.org/dist/thrift/0.8.0/thrift-0.8.0.tar.gz

    2. tar xzf thrift-0.8.0.tar.gz

    3. yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libeventdevel zlib-devel python-devel ruby-devel openssl-devel

    4. yum install boost-devel.x86_64

    5. yum install libevent-devel.x86_64

    安装Thrift

    1. 进入Thrift解压目录

    2. 运行:./configure --with-cpp=no --with-ruby=no
      如图:

    3. 运行:make

    4. 运行:make install

    产生针对Python的Hbase的API

    • 下载hbase源码:wget http://mirrors.hust.edu.cn/apache/hbase/0.98.24/hbase-0.98.24-src.tar.gz

    • 进入源码目录并查找thrift对python的支持模块:find . -name Hbase.thrift,查找后地址为:./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

    • 进入查找后的目录:cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/

    • 运行命令:thrift -gen py Hbase.thrift,生成python对Hbase的模块
      如图:

    • 进入gen-py目录,将hbase目录拷贝到需要运行python脚本文件的同级目录中,命令:cp -raf gen-py/hbase/ /test/hbase_test

    启动Thrift服务

    • 命令:hbase-daemon.sh start thrift
      如图:

    • 检查端口是否被监听
      命令:netstat -antup | grep 9090

    执行python文件,对hbase进行操作

    • 创建create_table.py文件,进行创建表操作
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    from hbase import Hbase
    from hbase.ttypes import *
    
    transport = TSocket.TSocket('master', 9090)
    transport = TTransport.TBufferedTransport(transport)
    
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
    client = Hbase.Client(protocol)
    
    transport.open()
    
    #==============================
    
    base_info_contents = ColumnDescriptor(name='meta-data:', maxVersions=1)
    other_info_contents = ColumnDescriptor(name='flags:', maxVersions=1)
    
    client.createTable('new_music_table', [base_info_contents, other_info_contents])
    
    print client.getTableNames()
    

    运行python文件,命令:python create_table.py

    • 创建insert_data.py文件,进行插入数据操作
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    from hbase import Hbase
    from hbase.ttypes import *
    
    transport = TSocket.TSocket('master', 9090)
    transport = TTransport.TBufferedTransport(transport)
    
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
    client = Hbase.Client(protocol)
    
    transport.open()
    
    tableName = 'new_music_table'
    rowKey = '1100'
    
    mutations = [Mutation(column="meta-data:name", value="wangqingshui"), 
            Mutation(column="meta-data:tag", value="pop"), 
            Mutation(column="flags:is_valid", value="TRUE")]
    
    client.mutateRow(tableName, rowKey, mutations, None)
    
    • 创建get_one_line.py文件,进行获取数据操作
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    from hbase import Hbase
    from hbase.ttypes import *
    
    transport = TSocket.TSocket('master', 9090)
    transport = TTransport.TBufferedTransport(transport)
    
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
    client = Hbase.Client(protocol)
    
    transport.open()
    
    tableName = 'new_music_table'
    rowKey = '1100'
    
    result = client.getRow(tableName, rowKey, None)
    
    for r in result:
        print 'the row is ' , r.row
        print 'the name is ' , r.columns.get('meta-data:name').value
        print 'the flag is ' , r.columns.get('flags:is_valid').value
    
    • 创建scan_many_lines.py文件,进行对hbase数据查询操作(扫描)
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    from hbase import Hbase
    from hbase.ttypes import *
    
    transport = TSocket.TSocket('master', 9090)
    transport = TTransport.TBufferedTransport(transport)
    
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
    client = Hbase.Client(protocol)
    
    transport.open()
    
    tableName = 'new_music_table'
    
    scan = TScan()
    id = client.scannerOpenWithScan(tableName, scan, None)
    result = client.scannerGetList(id, 10)
    
    for r in result:
        print '======'
        print 'the row is ' , r.row
    
        for k, v in r.columns.items():
            print "	".join([k, v.value])
    

    模块存放位置

    hbase >> python以及thrift >> python

    Java操作Hbase

    向Hbase中写记录

    package com.cxqy.baseoperation;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class HbasePutOneRecord {
    
    	public static final String TableName = "user_action_table";
    	public static final String ColumnFamily = "action_log";
    
    	public static Configuration conf = HBaseConfiguration.create();
    	private static HTable table;
    
    	public static void addOneRecord(String tableName, String rowKey, String family, String qualifier, String value)
    			throws IOException {
    		table = new HTable(conf, tableName);
    		Put put = new Put(Bytes.toBytes(rowKey));
    		put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
    		table.put(put);
    		System.out.println("insert record " + rowKey + " to table " + tableName + " success");
    	}
    
    	public static void main(String[] args) throws IOException {
    
    		conf.set("hbase.master", "192.168.87.200:60000");
    		conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202");
    
    		// TODO Auto-generated method stub
    		try {
    			addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "ip", "192.168.87.101");
    			addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "userid", "1100");
    			addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "ip", "192.168.1.201");
    			addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "userid", "1200");
    			addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "ip", "192.168.3.201");
    			addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "userid", "1300");
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    }
    

    从Hbase中读记录

    package com.cxqy.baseoperation;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class HbaseGetOneRecord {
    
    	public static final String TableName = "user_action_table";
    	public static final String ColumnFamily = "action_log";
    
    	public static Configuration conf = HBaseConfiguration.create();
    	private static HTable table;
    
    	public static void selectRowKey(String tablename, String rowKey) throws IOException {
    		table = new HTable(conf, tablename);
    		Get g = new Get(rowKey.getBytes());
    		Result rs = table.get(g);
    
    		System.out.println("==> " + new String(rs.getRow()));
    
    		for (Cell kv : rs.rawCells()) {
    			System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");
    			System.out.println("Column Family: " + new String(kv.getFamily()));
    			System.out.println("Column       :" + new String(kv.getQualifier()));
    			System.out.println("value        : " + new String(kv.getValue()));
    		}
    	}
    
    	public static void main(String[] args) throws IOException {
    
    		conf.set("hbase.master", "192.168.87.200:60000");
    		conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202");
    
    		// TODO Auto-generated method stub
    		try {
    			selectRowKey(TableName, "ip=192.168.87.200-003");
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    

    在Hbase中删除某个记录

    package com.cxqy.baseoperation;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class HbaseDelOneRecord {
    
    	public static final String TableName = "user_action_table";
    	public static final String ColumnFamily = "action_log";
    
    	public static Configuration conf = HBaseConfiguration.create();
    	private static HTable table;
    
    	public static void delOneRecord(String tableName, String rowKey) throws IOException {
    		table = new HTable(conf, tableName);
    		List<Delete> list = new ArrayList<Delete>();
    		Delete delete = new Delete(rowKey.getBytes());
    		list.add(delete);
    		table.delete(list);
    		System.out.println("delete record " + rowKey + " success!");
    	}
    
    	public static void main(String[] args) throws IOException {
    
    		conf.set("hbase.master", "192.168.87.200:60000");
    		conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202");
    
    		// TODO Auto-generated method stub
    		try {
    			delOneRecord(TableName, "ip=192.168.87.200-002");
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    }
    

    从Hbase中批量读记录

    package com.cxqy.baseoperation;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.BinaryComparator;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class HbaseScanManyRecords {
    
    	public static final String TableName = "user_action_table";
    	public static final String ColumnFamily = "action_log";
    
    	public static Configuration conf = HBaseConfiguration.create();
    	private static HTable table;
    
    	public static void getManyRecords(String tableName) throws IOException {
    		table = new HTable(conf, tableName);
    		Scan scan = new Scan();
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			for (KeyValue kv : result.raw()) {
    				System.out.print(new String(kv.getRow()) + " ");
    				System.out.print(new String(kv.getFamily()) + ":");
    				System.out.print(new String(kv.getQualifier()) + " ");
    				System.out.print(kv.getTimestamp() + " ");
    				System.out.println(new String(kv.getValue()));
    			}
    		}
    	}
    
    	public static void getManyRecordsWithFilter(String tableName, String rowKey) throws IOException {
    		table = new HTable(conf, tableName);
    		Scan scan = new Scan();
    //		scan.setStartRow(Bytes.toBytes("ip=10.11.1.2-996"));
    //		scan.setStopRow(Bytes.toBytes("ip=10.11.1.2-997"));
    		Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKey)));
    		scan.setFilter(filter);
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			for (KeyValue kv : result.raw()) {
    				System.out.print(new String(kv.getRow()) + " ");
    				System.out.print(new String(kv.getFamily()) + ":");
    				System.out.print(new String(kv.getQualifier()) + " ");
    				System.out.print(kv.getTimestamp() + " ");
    				System.out.println(new String(kv.getValue()));
    			}
    		}
    	}
    	
    	public static void getManyRecordsWithFilter(String tableName, ArrayList<String> rowKeyList) throws IOException {
    		table = new HTable(conf, tableName);
    		Scan scan = new Scan();
    		
    		List<Filter> filters = new ArrayList<Filter>();
    		for(int i = 0; i < rowKeyList.size(); i++) {
    			filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKeyList.get(i)))));
    		}
    		FilterList filerList = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters);
    		scan.setFilter(filerList);
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			System.out.println("===============");
    			for (KeyValue kv : result.raw()) {
    				System.out.print(new String(kv.getRow()) + " ");
    				System.out.print(new String(kv.getFamily()) + ":");
    				System.out.print(new String(kv.getQualifier()) + " ");
    				System.out.print(kv.getTimestamp() + " ");
    				System.out.println(new String(kv.getValue()));
    			}
    		}
    	}
    
    	public static void main(String[] args) throws IOException {
    
    		conf.set("hbase.master", "192.168.159.30:60000");
    		conf.set("hbase.zookeeper.quorum", "192.168.159.30,192.168.159.31,192.168.159.32");
    
    		// TODO Auto-generated method stub
    		try {
    //			getManyRecords(TableName);
    //			getManyRecordsWithFilter(TableName, "ip=192.11.1.200-0");
    			
    			ArrayList<String> whiteRowKeyList =new ArrayList<>();
    			whiteRowKeyList.add("ip=192.168.87.200-001");
    			whiteRowKeyList.add("ip=192.168.87.200-003");
    			getManyRecordsWithFilter(TableName, whiteRowKeyList);
    			
    			//getManyRecords(TableName);
    			
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    
  • 相关阅读:
    每日签到
    手机刮刮卡
    js随机更换
    jquery实现input输入框实时输入触发事件代码
    webAPP前端必备知识
    让你分分钟学会 JS 闭包
    js获取url?后的参数
    js获取上传的文件名
    Javascript 中神奇的 this
    由浅入深完全掌握Ajax
  • 原文地址:https://www.cnblogs.com/screen/p/9110081.html
Copyright © 2011-2022 走看看