zoukankan      html  css  js  c++  java
  • HBase java api

    一、说在前面:这是借鉴邹老师的哦!

    1、配置文件

    hbase.site.xml

    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!--
    /**
     *
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    -->
    <configuration>
    <property>     
            <name>hbase.rootdir</name>     
            <value>hdfs://hadoop100:9000/HBase</value>
        </property>
    
        <property>   
            <name>hbase.cluster.distributed</name>
            <value>true</value>
        </property>
    
        <property>    
            <name>hbase.zookeeper.quorum</name>
             <value>hadoop100:2181,hadoop101:2181,hadoop102:2181</value>
        </property>
    
        <property>   
            <name>hbase.zookeeper.property.dataDir</name>
             <value>/opt/module/zookeeper-3.4.12/datas</value>
        </property>
    
    </configuration>
    View Code

    2、获取连接操作:

    package com.me.hbase.utils;
    
    import java.io.IOException;
    
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    
    /*
     * 1.创建和关闭Connection对象
     * 
     * 2.如何在HBase中创建一个Configuration对象
     *         可以使用HBaseConfiguration.create(),返回的Configuration,既包含
     *         hadoop8个配置文件的参数,又包含hbase-default.xml和hbase-site.xml中所有的参数配置!
     */
    public class ConnectionUtil {
        
        //创建一个Connection对象
        public static Connection getConn() {
    
            try {
                return ConnectionFactory.createConnection();
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
    
        }
        
        
        public static void close(Connection conn) throws IOException {
            
            if (conn !=null) {
                conn.close();
            }
            
        }
        
        
    
    }
    View Code

    3、namespace相关操作

    package com.me.hbase.utils;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.NamespaceDescriptor;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /*
     * 1. 创建/删除/查询/判断是否存在   名称空间
     *             
     *         hbase shell :  开启一个客户端对象
     *         hbase shell : create_namespace 库名
     * 
     * 2. Admin :  提供对hbase管理的一些api!
     *                例如创建,删除,查询表等!
     *                 可以使用Connection.getAdmin()来获取Admin的一个实例,使用完成后,调用
     *                 close关闭!
     * 
     * 3. Connection:  Connection代表客户端和集群的一个连接!这个连接包含对master的连接,和zk的连接!
     *                   Connection可以使用ConnectionFactory来创建!
     *                   Connection的创建是重量级的,因此建议一个应用只创建一个Connection对象!Connection是线程安全的,
     *                   可以在多个线程中共享同一个Connection实例!
     *                     Connection的生命周期也是用户自己控制!
     * 
     *                 从Connection中获取Table和Admin对象的实例!Table和Admin对象的创建是轻量级,且不是线程安全的!
     *                 因此不建议池化或缓存Table和Admin对象的实例,每个线程有自己的Table和Admin对象的实例!
     * 
     * 4. NamespaceDescriptor: 用来代表和定义名称空间
     */
    public class NameSpaceUtil {
        
        private static Logger logger=LoggerFactory.getLogger(NameSpaceUtil.class);
        
        //查询所有的名称空间
        public static List<String> listNameSpace(Connection conn) throws IOException{
            
            List<String> nss=new ArrayList<>();
            
            //提供一个Admin
            Admin admin = conn.getAdmin();
            
            //查询所有的库
            NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
            
            for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
                //取出每个库描述中库的名称
                nss.add(namespaceDescriptor.getName());
                
            }
            
            //关闭admin
            admin.close();
            
            return nss;
            
        }
        
        //判断是否库存在
        
        public static boolean ifNSExists(Connection conn,String nsname) throws IOException {
            
            //库名校验
            if (StringUtils.isBlank(nsname)) {
                
                logger.error("请输入正常的库名!");
                //在后台提示,库名非法
                return false;
            }
            
            //提供一个Admin
            Admin admin = conn.getAdmin();
            
            //根据库名查询对应的NS,如果找不到就抛异常
            try {
                admin.getNamespaceDescriptor(nsname);
                
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                
                return false;
            }finally {
                
                //关闭admin
                admin.close();
            }
        }
        
        //创建库
        public static boolean creatNS(Connection conn,String nsname) throws IOException {
            
            //库名校验
            if (StringUtils.isBlank(nsname)) {
                
                logger.error("请输入正常的库名!");
                //在后台提示,库名非法
                return false;
            }
            
            //提供一个Admin
            Admin admin = conn.getAdmin();
            
            //新建库
            try {
                
                //先创建库的定义或描述
                NamespaceDescriptor descriptor = NamespaceDescriptor.create(nsname).build();
                
                admin.createNamespace(descriptor);
                
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                
                return false;
            }finally {
                
                //关闭admin
                admin.close();
            }
        }
        
        //删除库
            public static boolean deleteNS(Connection conn,String nsname) throws IOException {
                
                //库名校验
                if (StringUtils.isBlank(nsname)) {
                    
                    logger.error("请输入正常的库名!");
                    //在后台提示,库名非法
                    return false;
                }
                
                //提供一个Admin
                Admin admin = conn.getAdmin();
                
                
                //只能删除空库,判断当前库是否为empty,不为空无法删除
                //查询当前库下有哪些表
                List<String> tables = getTablesInNameSpace(conn, nsname);
                    
                if (tables.size()==0) {
                    
                    admin.deleteNamespace(nsname);
                    
                    //关闭admin
                    admin.close();
                    
                    return true;
                }else {
                    
                    //关闭admin
                    admin.close();
                    
                    logger.error(nsname+"库非空!无法删除!");
                    
                    return false;
                    
                }
                    
                    
                    
                    
            
                
            }
            
        //查询库下有哪些表
            public static List<String> getTablesInNameSpace(Connection conn,String nsname) throws IOException{
    
                //库名校验
                if (StringUtils.isBlank(nsname)) {
                    
                    logger.error("请输入正常的库名!");
                    //在后台提示,库名非法
                    return null;
                }
                
                
                List<String> tables=new ArrayList<>();
                
                //提供一个Admin
                Admin admin = conn.getAdmin();
                
                //查询当前库所有的表
                HTableDescriptor[] tableDescriptors = admin.listTableDescriptorsByNamespace(nsname);
                
                
                for (HTableDescriptor tableDescriptor : tableDescriptors) {
                    //取出每个表描述中表的名称
                    tables.add(tableDescriptor.getNameAsString());
                    
                }
                
                //关闭admin
                admin.close();
                
                return tables;
    
            }
    }
    View Code

    4、表的相关操作

    package com.me.hbase.utils;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /*
     * 1. 创建表和删除表
     * 
     * 2. TableName:  代表表名!
     *                 调用valueof(String 库名,String 表名),返回表名!
     *                 如果库名为null,此时使用default作为库名
     * 
     * 3. HTableDescriptor: 代表表的细节(描述),包含表中列族的描述!
     * 
     * 
     */
    public class TableUtil {
        
        private static Logger logger=LoggerFactory.getLogger(TableUtil.class);
    
        
        //验证表名是否合法并返回
        public static TableName checkTableName(String tableName,String nsname) {
            
            if (StringUtils.isBlank(tableName)) {
                
                logger.error("请输入正确的表名!");
                
                return null;
                
            }
            
            return TableName.valueOf(nsname, tableName);
            
        }
        
        //判断表是否存在
        public static boolean ifTableExists(Connection conn,String tableName,String nsname) throws IOException {
            
            //校验表名
            TableName tn = checkTableName(tableName, nsname);
            
            if (tn == null) {
                return false;
            }
            
            Admin admin = conn.getAdmin();
            
            //判断表是否存在,需要传入TableName对象
            boolean tableExists = admin.tableExists(tn);
            
            admin.close();
            
            return tableExists;
            
            
        }
        
        //创建表
        public static boolean createTable(Connection conn,String tableName,String nsname,String...cfs) throws IOException {
            
            //校验表名
            TableName tn = checkTableName(tableName, nsname);
                    
            if (tn == null) {
                return false;
            }
            
            //至少需要传入一个列族
            if (cfs.length < 1) {
                
                logger.error("至少需要指定一个列族!");
                
                return false;
                
            }
            
            Admin admin = conn.getAdmin();
            
            //创建表的描述
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tn);
            
            //讲列族的描述也添加到表的描述中
            for (String cf : cfs) {
                
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
                
                //添加列族的设置
                hColumnDescriptor.setMinVersions(3);
                hColumnDescriptor.setMaxVersions(10);
                
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            
            //根据表的描述创建表
            admin.createTable(hTableDescriptor);
            
            admin.close();
            
            return true;
            
            
        }
        
        
        
        //删除表
        public static boolean dropTable(Connection conn,String tableName,String nsname) throws IOException {
            
                //检查表是否存在
                if (!ifTableExists(conn, tableName, nsname)) {
                    
                    return false;
                }
            
                //校验表名
                TableName tn = checkTableName(tableName, nsname);
                
                Admin admin = conn.getAdmin();
                
                //删除之前需要先禁用表
                admin.disableTable(tn);
                
                //删除表
                admin.deleteTable(tn);
                
                admin.close();
                
                return true;
                
                
            }
    
    }
    View Code

    5、数据的相关操作

    package com.me.hbase.utils;
    
    import java.io.IOException;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    /*
     * 1.数据的增删改查,需要使用的是Table
     * 
     * 2.Put: 代表对单行数据的put操作
     * 
     * 3. 在hbase中,操作的数据都是以byte[]形式存在,需要把常用的数据类型转为byte[]
     *             hbase提供了Bytes工具类
     *                 Bytes.toBytes(x): 基本数据类型转byte[]
     *                 Bytes.toXxx(x): 从byte[]转为Xxx类型!
     * 
     * 4. Get: 代表对单行数据的Get操作!
     * 
     * 5. Result: scan或get的单行的所有的记录!
     * 
     * 6. Cell: 代表一个单元格,hbase提供了CellUtil.clonexxx(Cell),来获取cell中的列族、列名和值属性!
     * 
     * 7. ResultScanner: 多行Result对象的集合
     */
    public class DataUtil {
        
        //先获取到表的table对象
        public static Table getTable(Connection conn,String tableName,String nsname) throws IOException {
            
            //验证表名是否合法
            TableName tn = TableUtil.checkTableName(tableName, nsname);
            
            if (tn == null) {
                return null;
            }
            
            //根据TableName获取对应的Table
            return conn.getTable(tn);
            
        }
        
        //put 表名,rowkey,列名(列族名:列名),value
        public static void put(Connection conn,String tableName,String nsname,String rowkey,String cf,
                String cq,String value) throws IOException {
            
            //获取表对象
            Table table = getTable(conn, tableName, nsname);
            
            if (table==null) {
                return;
            }
            
            //创建一个Put对象
            Put put = new Put(Bytes.toBytes(rowkey));
            
            //向put中设置cell的细节信息
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(value));
            //.addColumn(family, qualifier, value)
            
            table.put(put);
            
            table.close();
            
            
        }
        
        // get 表名   rowkey
        
        public static void get(Connection conn,String tableName,String nsname,String rowkey) throws IOException {
            
            //获取表对象
            Table table = getTable(conn, tableName, nsname);
            
            if (table==null) {
                return ;
            }
            
            Get get = new Get(Bytes.toBytes(rowkey));
            
            //设置单行查询的详细信息
            //设置查哪个列
            //get.addColumn(family, qualifier)
            //设置查哪个列族
            //get.addFamily(family)
            //只查某个时间戳的数据
            //get.setTimeStamp(timestamp)
            //设置返回的versions
            //get.setMaxVersions(maxVersions)
            
            Result result = table.get(get);
            
            //System.out.println(result);
            
            parseResult(result);
            
            table.close();
            
            
        }
        
        //遍历result
        public static void parseResult(Result result) {
            
            if (result != null) {
                
                Cell[] cells = result.rawCells();
                
                for (Cell cell : cells) {
                    
                    System.out.println("行:"+Bytes.toString(CellUtil.cloneRow(cell))+
                            "  列族:"+Bytes.toString(CellUtil.cloneFamily(cell))+"   列名:"+
                            Bytes.toString(CellUtil.cloneQualifier(cell))+
                            "  值:"+Bytes.toString(CellUtil.cloneValue(cell)));
                    
                }
                
            }
            
        }
        
        //scan '表名', {STARTROW=> X,STOPROW=>X, LIMIT=>1}
        public static void scan(Connection conn,String tableName,String nsname) throws IOException {
            
            //获取表对象
            Table table = getTable(conn, tableName, nsname);
                    
            if (table==null) {
                return ;
            }
            
            //构建scan对象
            Scan scan = new Scan();
            
            //设置扫描的起始行
            //scan.setStartRow(startRow)
            //设置扫描的结束行
            //scan.setStopRow(stopRow)
            //设置扫描的列
            //scan.addFamily(Bytes.toBytes(column));
            //结果集扫描器
            ResultScanner scanner = table.getScanner(scan);
            
            for (Result result : scanner) {
                
                parseResult(result);
                
            }
            
            //关闭表
            table.close();
            
        }
    
        //scan '表名', {STARTROW=> X,STOPROW=>X, LIMIT=>1}
        public static void scanCol(Connection conn,String tableName,String nsname,String column) throws IOException {
    
            //获取表对象
            Table table = getTable(conn, tableName, nsname);
    
            if (table==null) {
                return ;
            }
    
            //构建scan对象
            Scan scan = new Scan();
    
            //设置扫描的起始行
            //scan.setStartRow(startRow)
            //设置扫描的结束行
            //scan.setStopRow(stopRow)
            //设置扫描的列
            scan.addFamily(Bytes.toBytes(column));
            //结果集扫描器
            ResultScanner scanner = table.getScanner(scan);
    
            for (Result result : scanner) {
    
                parseResult(result);
    
            }
    
            //关闭表
            table.close();
    
        }
        
        //delete '表名','rowkey',[列族][列][ts]
        public static void delete(Connection conn,String tableName,String nsname,String rowkey) throws IOException {
            
            //获取表对象
            Table table = getTable(conn, tableName, nsname);
                    
            if (table==null) {
                return ;
            }
            
            //构建delete对象
            Delete delete = new Delete(Bytes.toBytes(rowkey));
            
            //设置delete时的参数
            // 删除某个具体的列,为此列的最新的cell,添加一条type=DELETE的标记,只能删除最新的一条记录,如果有
            // 历史版本的记录,无法删除
            //delete.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"));
            
            //删除指定列的所有版本的数据,为当前列生成一个type=DeleteColumn的标记的记录
            //delete.addColumns(Bytes.toBytes("cf1"), Bytes.toBytes("age"));
            
            //删除整个列族
            //delete.addFamily(Bytes.toBytes("cf2"));
            
            table.delete(delete);
            
            //关闭表
            table.close();
            
        }
        
        
    
    }
    View Code

    6、测试

    package com.me.hbase.utils;
    
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    
    import java.io.IOException;
    
    public class HBaseSY {
    
        private static Connection conn = ConnectionUtil.getConn();
        private static String nsname = "ns1";
        //private static String nsname = null;
    
        /*
        createTable(String tableName, String[] fields)创建表,
        参数tableName为表的名称,字符串数组fields为存储记录各个域名称的数组。
        要求当HBase已经存在名为tableName的表的时候,先删除原有的表,然后再创建新的表。
         */
        public void createTable(String tableName, String[] fields) throws IOException {
    
            boolean exist = TableUtil.ifTableExists(conn, tableName, nsname);
            if (exist){
                TableUtil.dropTable(conn,tableName,nsname);
            }
            TableUtil.createTable(conn,tableName,nsname,fields);
    
        }
    
        /*
        addRecord(String tableName, String row, String[] fields, String[] values)
        向表tableName、行row(用S_Name表示)和字符串数组files指定的单元格中添加对应的数据values。
        其中fields中每个元素如果对应的列族下还有相应的列限定符的话,用“columnFamily:column”表示。
        例如,同时向“Math”、“Computer Science”、“English”三列添加成绩时,
        字符串数组fields为{“Score:Math”,”Score:Computer Science”,”Score:English”},
        数组values存储这三门课的成绩。
         */
        public  void addRecord(String tableName,String row,String[]fields,String[]values)throws Exception{
            for (int i = 0; i < fields.length; i++) {
                DataUtil.put(conn,tableName,nsname,row,fields[i],"",values[i]);
            }
    
        }
        /*
        scanColumn(String tableName, String column)浏览表tableName某一列的数据,
        如果某一行记录中该列数据不存在,则返回null。要求当参数column为某一列族名称时,如果底下有若干个列限定符,
        则要列出每个列限定符代表的列的数据;当参数column为某一列具体名称(例如“Score:Math”)时,只需要列出该列的数据。
         */
        public void scanColumn(String tableName,String column) throws IOException {
            DataUtil.scanCol(conn,tableName,nsname,column);
        }
    
        /*
         modifyData(String tableName, String row, String column,String val)
        修改表tableName,行row(可以用学生姓名S_Name表示),列column指定的单元格的数据。
         */
        public void modifyData(String tableName, String row, String column,String val) throws IOException {
            DataUtil.put(conn,tableName,nsname,row,column,"",val);
        }
    
        /*
        deleteRow(String tableName, String row)删除表tableName中row指定的行的记录。
         */
        public void  deleteRow(String tableName, String row) throws IOException {
            DataUtil.delete(conn,tableName,nsname,row);
        }
    
        public static void main(String[] args) throws Exception {
            HBaseSY hBaseSY = new HBaseSY();
            String [] str={"cs1","cs2"};
            String [] str1={"cs1"};
            String [] str2={"bujv"};
            String tn="cs";
            //hBaseSY.createTable(tn,str);
            //hBaseSY.addRecord(tn,"r2",str1,str2);
            //hBaseSY.scanColumn(tn,"cs1");
            //hBaseSY.modifyData(tn,"r1","cs1","xiaotian");
            hBaseSY.deleteRow(tn,"r2");
        }
    
    }
    View Code
  • 相关阅读:
    一、【注解】Spring注解@ComponentScan
    一致性Hash算法
    垃圾回收器搭配和调优
    JVM的逃逸分析
    简单理解垃圾回收
    类加载机制和双亲委派模型
    VMWare15下安装CentOS7
    HBase协处理器(1)
    依赖注入的三种方式
    Javascript-设计模式_装饰者模式
  • 原文地址:https://www.cnblogs.com/20183544-wangzhengshuai/p/13830036.html
Copyright © 2011-2022 走看看