zoukankan      html  css  js  c++  java
  • HBase的访问方式

    这里只介绍三种最常用的方式

    1.HBase shell

    HBase的命令行工具是最简单的接口,主要用于HBase管理

    首先启动HBase

     

    帮助

    hbase(main):001:0> help

     查看HBase服务器状态

    hbase(main):001:0> status

     

    查询HBse版本

    hbase(main):002:0> version

     

     ddl操作

    1.创建一个member表

    hbase(main):013:0> create 'table1','tab1_id','tab1_add','tab1_info'

     

    2.查看所有的表

    hbase(main):006:0> list

     

    3.查看表结构

    hbase(main):007:0> describe 'member'

     

    4.删除一个列簇

     

    5、查看表是否存在

    6、判断表是否为"enable"

    7、删除一个表

    dml操作

     1、创建member表

    删除一个列簇(一般不超过两个列簇)

    2、往member表插入数据

    3、扫描查看数据

    4、获取数据

    获取一个rowkey的所有数据

    获取一个rowkey,一个列簇的所有数据

    获取一个rowkey,一个列簇中一个列的所有数据

     5、更新数据

    6、删除列簇中其中一列

    7、统计表中总的行数

    8、清空表中数据

    2.java API

    最常规且最高效的访问方式

      1 import java.io.IOException;
      2 
      3 import org.apache.hadoop.conf.Configuration;
      4 import org.apache.hadoop.hbase.HBaseConfiguration;
      5 import org.apache.hadoop.hbase.HColumnDescriptor;
      6 import org.apache.hadoop.hbase.HTableDescriptor;
      7 import org.apache.hadoop.hbase.KeyValue;
      8 import org.apache.hadoop.hbase.MasterNotRunningException;
      9 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
     10 import org.apache.hadoop.hbase.client.Delete;
     11 import org.apache.hadoop.hbase.client.Get;
     12 import org.apache.hadoop.hbase.client.HBaseAdmin;
     13 import org.apache.hadoop.hbase.client.HTable;
     14 import org.apache.hadoop.hbase.client.Put;
     15 import org.apache.hadoop.hbase.client.Result;
     16 import org.apache.hadoop.hbase.client.ResultScanner;
     17 import org.apache.hadoop.hbase.client.Scan;
     18 import org.apache.hadoop.hbase.util.Bytes;
     19 
     20 public class HbaseTest {
     21     public static Configuration conf;
     22     static{
     23         conf = HBaseConfiguration.create();//第一步
     24         conf.set("hbase.zookeeper.quorum", "header-2,core-1,core-2");
     25         conf.set("hbase.zookeeper.property.clientPort", "2181");
     26         conf.set("hbase.master", "header-1:60000");
     27     }
     28 
     29     public static void main(String[] args) throws IOException{
     30         //createTable("member");
     31         //insertDataByPut("member");
     32         //QueryByGet("member");
     33         QueryByScan("member");
     34         //DeleteData("member");
     35     }
     36  
     37 
     38 
     39     /**
     40      * 创建表  通过HBaseAdmin对象操作
     41      * 
     42      * @param tablename
     43      * @throws IOException 
     44      * @throws ZooKeeperConnectionException 
     45      * @throws MasterNotRunningException 
     46      * 
     47      */
     48     public static void createTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
     49         //创建HBaseAdmin对象
     50         HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
     51         //判断表是否存在,若存在就删除
     52         if(hBaseAdmin.tableExists(tableName)){
     53             hBaseAdmin.disableTable(tableName);
     54             hBaseAdmin.deleteTable(tableName);
     55         }
     56         HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
     57         //添加Family
     58         tableDescriptor.addFamily(new HColumnDescriptor("info"));
     59         tableDescriptor.addFamily(new HColumnDescriptor("address"));
     60         //创建表
     61         hBaseAdmin.createTable(tableDescriptor);
     62         //释放资源
     63         hBaseAdmin.close();
     64     }
     65     
     66     /**
     67      * 
     68      * @param tableName
     69      * @throws IOException 
     70      */
     71     @SuppressWarnings("deprecation")
     72     public static void insertDataByPut(String tableName) throws IOException {
     73         //第二步   获取句柄,传入静态配置和表名称
     74         HTable table = new HTable(conf, tableName);
     75         
     76         //添加rowkey,添加数据,  通过getBytes方法将string类型都转化为字节流
     77         Put put1 = new Put(getBytes("djt"));
     78         put1.add(getBytes("address"), getBytes("country"), getBytes("china"));
     79         put1.add(getBytes("address"), getBytes("province"), getBytes("beijing"));
     80         put1.add(getBytes("address"), getBytes("city"), getBytes("beijing"));
     81         
     82         put1.add(getBytes("info"), getBytes("age"), getBytes("28"));
     83         put1.add(getBytes("info"), getBytes("birthdy"), getBytes("1998-12-23"));
     84         put1.add(getBytes("info"), getBytes("company"), getBytes("dajiang"));
     85         
     86         //第三步
     87         table.put(put1);
     88         
     89         //释放资源
     90         table.close();
     91     }
     92     
     93     /**
     94      * 查询一条记录
     95      * @param tableName
     96      * @throws IOException 
     97      */
     98     public static void QueryByGet(String tableName) throws IOException {
     99         //第二步
    100         HTable table = new HTable(conf, tableName);
    101         //根据rowkey查询
    102         Get get = new Get(getBytes("djt"));
    103         Result r = table.get(get);
    104         System.out.println("获得到rowkey:" + new String(r.getRow()));
    105         for(KeyValue keyvalue : r.raw()){
    106             System.out.println("列簇:" + new String(keyvalue.getFamily())
    107             + "====列:" + new String(keyvalue.getQualifier())
    108             + "====值:" + new String(keyvalue.getValue()));
    109         }
    110         table.close();
    111     }
    112     
    113 
    114 
    115     /**
    116      * 扫描
    117      * @param tableName
    118      * @throws IOException 
    119      */
    120     public static void QueryByScan(String tableName) throws IOException {
    121         // 第二步
    122         HTable table = new HTable(conf, tableName);
    123         Scan scan = new Scan();
    124         //指定需要扫描的列簇,列.如果不指定就是全表扫描
    125         scan.addColumn(getBytes("info"), getBytes("company"));
    126         ResultScanner scanner = table.getScanner(scan);
    127         for(Result r : scanner){
    128             System.out.println("获得到rowkey:" + new String(r.getRow()));
    129             for(KeyValue kv : r.raw()){
    130                 System.out.println("列簇:" + new String(kv.getFamily())
    131                 + "====列:" + new String(kv.getQualifier())
    132                 + "====值 :" + new String(kv.getValue()));
    133             }
    134         }
    135         //释放资源
    136         scanner.close();
    137         table.close();
    138     }
    139     
    140      
    141 
    142     /**
    143      * 删除一条数据
    144      * @param tableName
    145      * @throws IOException 
    146      */
    147     public static void DeleteData(String tableName) throws IOException {
    148         // 第二步
    149         HTable table = new HTable(conf, tableName);
    150         
    151         Delete delete = new Delete(getBytes("djt"));
    152         delete.addColumn(getBytes("info"), getBytes("age"));
    153         
    154         table.delete(delete);
    155         //释放资源
    156         table.close();
    157     }
    158 
    159     /**
    160      * 转换byte数组(string类型都转化为字节流)
    161      */
    162     public static byte[] getBytes(String str){
    163         if(str == null)
    164             str = "";
    165             return Bytes.toBytes(str);    
    166     }
    167 }

    3.MapReduce

    直接使用MapReduce作业处理HBase数据

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    /**
     * 将hdfs里面的数据导入hbase
     * @author Administrator
     *
     */
    public class MapReduceWriteHbaseDriver {
        
        public static class WordCountMapperHbase extends Mapper<Object, Text, 
               ImmutableBytesWritable, IntWritable>{
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
                StringTokenizer itr = new StringTokenizer(value.toString());
                while(itr.hasMoreTokens()){
                    word.set(itr.nextToken());
                    //输出到hbase的key类型为ImmutableBytesWritable
                    context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
                }
            }
        }
        
        public static class WordCountReducerHbase extends TableReducer<ImmutableBytesWritable,
                  IntWritable, ImmutableBytesWritable>{
            private IntWritable result = new IntWritable();
            public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) 
                    throws IOException, InterruptedException{
                int sum = 0;
                for(IntWritable val : values){
                    sum += val.get();    
                }
                //put实例化 key代表主键,每个单词存一行
                Put put = new Put(key.get());
                //三个参数分别为:列簇content  列count  列值为词频
                put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
                context.write(key, put);
            }
        }
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
            String tableName = "wordcount";//hbase数据库表名   也可以通过命令行传入表名args
            Configuration conf = HBaseConfiguration.create();//实例化Configuration
            conf.set("hbase.zookeeper.quorum", "header-2,core-1,core-2");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("hbase.master", "header-1");
            
            //如果表已经存在就先删除
            HBaseAdmin admin = new HBaseAdmin(conf);
            if(admin.tableExists(tableName)){
                admin.disableTable(tableName);
                admin.deleteTable(tableName);
            }
            
            HTableDescriptor htd = new HTableDescriptor(tableName);//指定表名
            HColumnDescriptor hcd = new HColumnDescriptor("content");//指定列簇名
            htd.addFamily(hcd);//创建列簇
            admin.createTable(htd);//创建表
            
            Job job = new Job(conf, "import from hdfs to hbase");
            job.setJarByClass(MapReduceWriteHbaseDriver.class);
            
            job.setMapperClass(WordCountMapperHbase.class);
            
            //设置插入hbase时的相关操作
            TableMapReduceUtil.initTableReducerJob(tableName, WordCountReducerHbase.class, job, null, null, null, null, false);
            
            //map输出
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            //reduce输出
            job.setOutputKeyClass(ImmutableBytesWritable.class);
            job.setOutputValueClass(Put.class);
            
            //读取数据
            FileInputFormat.addInputPaths(job, "hdfs://header-1:9000/user/test.txt");
            System.out.println(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 读取hbse数据存入HDFS
     * @author Administrator
     *
     */
    public class MapReduceReaderHbaseDriver {
        public static class WordCountHBaseMapper extends TableMapper<Text, Text>{
            protected void map(ImmutableBytesWritable key, Result values,Context context) throws IOException, InterruptedException{
                StringBuffer sb = new StringBuffer("");
                //获取列簇content下面的值
                for(java.util.Map.Entry<byte[], byte[]> value : values.getFamilyMap("content".getBytes()).entrySet()){
                    String str = new String(value.getValue());
                    if(str != null){
                        sb.append(str);
                    }
                    context.write(new Text(key.get()), new Text(new String(sb)));
                }    
            }
        }
    
        public static class WordCountHBaseReducer extends Reducer<Text, Text, Text, Text>{
            private Text result = new Text();
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
                for(Text val : values){
                    result.set(val);
                    context.write(key, result);
                }
            }
        }
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            String tableName = "wordcount";//表名称
            Configuration conf = HBaseConfiguration.create();//实例化Configuration
            conf.set("hbase.zookeeper.quorum", "header-2,core-1,core-2");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("hbase.master", "header-1:60000");
            
            Job job = new Job(conf, "import from hbase to hdfs");
            job.setJarByClass(MapReduceReaderHbaseDriver.class);
            
            job.setReducerClass(WordCountHBaseReducer.class);
            //配置读取hbase的相关操作
            TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WordCountHBaseMapper.class, Text.class, Text.class, job, false);
            
            //输出路径
            FileOutputFormat.setOutputPath(job, new Path("hdfs://header-1:9000/user/out"));
            System.out.println(job.waitForCompletion(true) ? 0 : 1);
        }    
    }
  • 相关阅读:
    《CLR Via C# 第3版》笔记之(十四) 泛型高级
    《CLR Via C# 第3版》笔记之(十三) 泛型基础
    AOP学习基于Emit和Attribute的简单AOP实现
    《CLR Via C# 第3版》笔记之(十五) 接口
    C# 连接Oracle(利用ODP.net,不安装oracle客户端)
    《CLR Via C# 第3版》笔记之(十七) 线程基础
    C#直接读取磁盘文件(类似linux的Direct IO模式)
    《CLR Via C# 第3版》笔记之(十六) 字符串
    [置顶] C#中通过调用webService获取上网IP地址的区域的方法
    Android中Socket通讯类
  • 原文地址:https://www.cnblogs.com/lyywj170403/p/9334869.html
Copyright © 2011-2022 走看看