zoukankan      html  css  js  c++  java
  • hbase(二)Java操作 hbase

    一、准备读取文件 hbaseFile.txt

    rowkey,name:firstName,name:lastName,address:province,address:city,address:district
    1,chen,allen,jiangsu,nanjing,xuanwu
    2,chen,henry,jiangsu,yancheng,jianhu
    3,li,pola,jiangsu,nanjing,xuanwu
    4,chen,angle,anhui,hefei,daqin
    5,fang,zhimin,anhui,wuhu,huijia
    6,ge,you,beijing,chaoyang,henan
    7,li,zhengming,jiangsu,nanjing,gulou

    二、pom.xml(maven-quickstart)

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
      </repositories>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>1.2.0-cdh5.14.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-common</artifactId>
          <version>1.2.0-cdh5.14.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-server</artifactId>
          <version>1.2.0-cdh5.14.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>2.6.0-cdh5.14.2</version>
        </dependency>
      </dependencies>

    三、目录

    1.base基础类

    package cn.kb08.hbase.core;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Table;
    
    import java.io.IOException;
    
    public class Base {
        /**
         * 根据配置信息获取连接对象
         * @return
         * @throws IOException
         */
        private Connection getCon() throws IOException {
            Configuration config = HBaseConfiguration.create();
            // 一个是hadoop的配置,一个是hbase的配置
            config.addResource("/opt/bigdata/hadoop/hadoop260/etc/hadoop/core-site.xml");
            config.addResource("/opt/bigdata/hadoop/hbase120/conf/hbase-site.xml");
            return ConnectionFactory.createConnection(config);
        }
    
        /**
         * 根据连接对象获取管理对象:操作对象(比如表,命名空间...)
         * @return
         * @throws IOException
         */
        public Admin admin() throws IOException {
            return getCon().getAdmin();
        }
    
        /**
         * 根据连接对象获取数据表对象:操作表数据(比如:增删改查)
         * @param tabName
         * @return
         * @throws IOException
         */
        public Table table(String tabName) throws IOException {
            return getCon().getTable(TableName.valueOf(tabName));
        }
    }

    2.dao

    package cn.kb08.hbase.core;
    
    import org.apache.commons.collections.map.ListOrderedMap;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.*;
    import java.util.*;
    
    public class Dao {
        private Base base = new Base();
        private Parser parser = new Parser();
    
        // 创建表需要:tablename + families
        public void createTable(String tableName,String[] colFamilies){
            Admin admin = null;
            try {
                admin = base.admin();
                TableName tn = TableName.valueOf(tableName);
                if(!admin.isTableAvailable(tn)){
                    HTableDescriptor table = new HTableDescriptor(tn);
                    for (int i = 0; i < colFamilies.length; i++) {
                        table.addFamily(new HColumnDescriptor(colFamilies[i]));
                    }
                    admin.createTable(table);
                    System.out.println("创建表"+tn.getNameAsString()+"成功");
                }else{
                    System.out.println("表"+tn.getNameAsString()+"已存在");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void dropTable(String tableName){
            Admin admin = null;
            TableName tn = TableName.valueOf(tableName);
            try {
                admin = base.admin();
                if(admin.isTableAvailable(tn)){
                    admin.disableTable(tn);
                    admin.deleteTable(tn);
                    System.out.println("删除表"+tn.getNameAsString()+"成功");
                }else{
                    System.out.println("表"+tn.getNameAsString()+"不存在");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void put(String tableName,String filePath){
            File file = new File(filePath);
            if(!file.exists() || !file.isFile() || file.length()==0){
                System.out.println("文件"+file.getAbsolutePath()+"不存在");
                return;
            }
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
                String line = null;
                boolean first = true;
                Table table = base.table(tableName);
                Map<String, List<String>> family = new ListOrderedMap();
                while(null != (line=reader.readLine())){
                    // 第一行:添加列族的列,构建结构
                    if(first){
                        //rowkey,name:firstname,name:lastname,address:province,...
                        String[] families = line.split(",");
                        String _family,_col;
                        for (int i = 1; i < families.length; i++) {
                            String[] cols = families[i].split(":");
                            _family = cols[0];
                            _col = cols[1];
                            if(family.containsKey(_family)){
                                family.get(_family).add(_col);
                            }else{
                                List<String> cs = new ArrayList<>();
                                cs.add(_col);
                                family.put(_family,cs);
                            }
                        }
                        System.out.println("---TABLE STRUCTURE from FILE----");
                        for (Map.Entry<String, List<String>> entry : family.entrySet()) {
                            System.out.println(entry.getKey());
                            for (String s : entry.getValue()) {
                                System.out.println("- "+s);
                            }
                        }
                        System.out.println("-------------------------------");
                        first = false;
                    }else{
                        // rowkey,value1,value2,...
                        String[] split = line.split(",");
                        int ix = 0;
                        // 1.Put(rowkey)   2.put.addcolumn
                        Put put = new Put(Bytes.toBytes(split[ix++]));
                        for (Map.Entry<String, List<String>> entry : family.entrySet()) {
                            String _family = entry.getKey();
                            // _col: 每个family对应的每个列名
                            for (String _col : entry.getValue()) {
                                put.addColumn(Bytes.toBytes(_family),Bytes.toBytes(_col),Bytes.toBytes(split[ix++]));
                            }
                        }
                        table.put(put);
                    }
                }
                reader.close();
                System.out.println("新增表"+table.getName().getNameAsString()+"信息成功");
                return;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void delete(String tableName,String[] rowKeys){
            Table table = null;
            try {
                table = base.table(tableName);
                List<Delete> dels = new ArrayList<>(rowKeys.length);
                for (int i = 0; i < rowKeys.length; i++) {
                    dels.add(new Delete(Bytes.toBytes(rowKeys[i])));
                }
                table.delete(dels);
                System.out.println("删除表"+table.getName().getNameAsString()+"主键为 ["+ Arrays.toString(rowKeys)+"]的数据成功");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void get(String tableName,String rowKey,String...others){
            Get get = new Get(Bytes.toBytes(rowKey));
            // 查family
            if(others.length==1){
                get.addFamily(Bytes.toBytes(others[0]));
            // 查family, column
            }else if(others.length==2){
                get.addColumn(Bytes.toBytes(others[0]),Bytes.toBytes(others[1]));
            }
            Result result = null;
            try {
                result = base.table(tableName).get(get);
                Table table = base.table(tableName);
                if(others.length==0){
                    // 查所有的列族
                    // 构建包含 列 的查询结构
                    Map<String, List<String>> structure = parser.parse(table, result);
                    parser.print(result,structure);
                }else if(others.length==1){
                    // other[0] : 某列族
                    // 查该列族所有的列
                    // 如果others有值,检验是否合理
                    Map<String, List<String>> structure = parser.parse(table, result,others[0]);
                    parser.print(result,structure);
                }else{
                    // get一行:result.getValue(cf , c )
                    System.out.println(Bytes.toString(result.getValue(Bytes.toBytes(others[0]),Bytes.toBytes(others[1]))));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void scan(String tableName,String...rowKeys){
            Scan scan = new Scan();
            String[] families = null;
            Table table = null;
            try {
                table = base.table(tableName);
                if(rowKeys.length==0){
                    // 查询所有列族
                    for (HColumnDescriptor columnFamily : table.getTableDescriptor().getColumnFamilies()) {
                        scan.addFamily(columnFamily.getName());
                    }
                }else{
                    //****** StartRow <= scan < StopRow *******
                        scan.setStartRow(Bytes.toBytes(rowKeys[0]));
                        scan.setStopRow(Bytes.toBytes(rowKeys[rowKeys.length-1]));
    
                }
                Iterator<Result> it = table.getScanner(scan).iterator();
                if(it.hasNext()){
                    Result next = it.next();
                    // 无论是否有rowkey,都需要查所有的列族
                    Map<String, List<String>> parse = parser.parse(table, next);
                    System.out.println("-----TABLE STRUCTURE------");
                    for (Map.Entry<String, List<String>> entry : parse.entrySet()) {
                        System.out.println(entry.getKey());
                        for (String s : entry.getValue()) {
                            System.out.println("- "+s);
                        }
                    }
                    System.out.println("--------------------------");
                    parser.print(next,parse);
                    while (it.hasNext()){
                        next = it.next();
                        System.out.println("----------next-----------");
                        parser.print(next,parse);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void filter(String tableName,String columnFamily,String column,String value){
            //创建单列过滤条件
            SingleColumnValueFilter filter = new SingleColumnValueFilter(
                    Bytes.toBytes(columnFamily),Bytes.toBytes(column), CompareFilter.CompareOp.EQUAL,Bytes.toBytes(value));
            Scan scan = new Scan();
            scan.setFilter(filter);//添加过滤条件
            Table table = null;
            try {
                table = base.table(tableName);
                for (HColumnDescriptor _columnFamily : table.getTableDescriptor().getColumnFamilies()) {
                    scan.addFamily(_columnFamily.getName());
                }
                Iterator<Result> it = table.getScanner(scan).iterator();
                if(it.hasNext()){
                    Result next = it.next();
                    Map<String, List<String>> parse = parser.parse(table, next);
                    System.out.println("-----TABLE STRUCTURE------");
                    for (Map.Entry<String, List<String>> entry : parse.entrySet()) {
                        System.out.println(entry.getKey());
                        for (String s : entry.getValue()) {
                            System.out.println("- "+s);
                        }
                    }
                    System.out.println("--------------------------");
                    parser.print(next,parse);
                    while (it.hasNext()){
                        next = it.next();
                        System.out.println("----------next-----------");
                        parser.print(next,parse);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    3.parser工具类

    package cn.kb08.hbase.core;
    
    import org.apache.commons.collections.map.ListOrderedMap;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.NavigableMap;
    
    public class Parser {
        /**
         * 根据Table和Result解析表结构
         * @param table 数据表:列族
         * @param result 结果集:根据列族解析列
         * @param cf 可能有family
         * @return
         * @throws IOException
         */
        public Map<String,List<String>> parse(Table table, Result result, String...cf) throws IOException {
            // 根据tablename获得所有的列族
            HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies();
            Map<String,List<String>> _families = new ListOrderedMap();
            for (HColumnDescriptor family : families) {
                //单列族过滤(在此需求中等同于多列族过滤)
    //            if(null != cf && null !=cf[0] && !family.getNameAsString().equals(cf[0])){
    //                continue;
    //            }
            // ******检查算法******多列族过滤*********(此需求中没有用到传参多个列族)
                if(null != cf && cf.length>0){
                    boolean notIn = true;
                    for (String s : cf) {
                        // 传参cf中列族的名称 是包含在 table 里的,
                        if(s.equals(family.getNameAsString())){
                            notIn = false;
                            break;
                        }
                    }
                    // 只有符合条件的cf才会往下走,所有不符合条件的列族都不会执行下一步
                    if(notIn){
                        continue;
                    }
                }
                // column - value
                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(family.getName());
                // keyset:所有的列
                for (byte[] bytes : familyMap.keySet()) {
                    String name = family.getNameAsString();
                    if(_families.containsKey(name)){
                        // 列族 - 列名
                        _families.get(name).add(Bytes.toString(bytes));
                    }else{
                        List<String> cols = new ArrayList<>();
                        cols.add(Bytes.toString(bytes));
                        _families.put(name,cols);
                    }
                }
            }
            return _families;
        }
    
        /**
         * 动态解析Result为字符串,并输出
         * @param rst 结果集
         * @param structure 表结构
         */
        public void print(Result rst,Map<String,List<String>> structure){
            StringBuilder builder = new StringBuilder();
            boolean first = true;
            for (Map.Entry<String, List<String>> entry : structure.entrySet()) {
                String family = entry.getKey();
                for (String col : entry.getValue()) {
                    if(first) first= false;
                 else builder.append("	");
                    builder.append(Bytes.toString(rst.getValue(Bytes.toBytes(family),Bytes.toBytes(col))));
                }
            }
            System.out.println(builder.toString());
        }
    }

    4.app执行类

    public class App
    {
        //args: 0操作类型,1表名,2 rowkey,3列族,4列
        public static void main(String[] args) throws IOException {
            Dao dao = new Dao();
            String type = args[0];
            if(type.startsWith("tb")){
                if(args.length<2){
                    return;
                }
                if(type.endsWith("Create")){ //创建表: 0操作类型, 1表名, 2列族
                    String[] families = new String[args.length - 2];
                    System.arraycopy(args,2,families,0,families.length);
                    dao.createTable(args[1],families);
                }else if(type.endsWith("Drop")){ //删除表
                    dao.dropTable(args[1]);
                }
            }else if(type.startsWith("dt")){ //表数据操作
                if(args.length<2){
                    return;
                }
                if(type.endsWith("Put")){ //新增数据:数据源为文件
                    if(args.length<3){
                        return;
                    }
                    dao.put(args[1],args[2]);
                }else if(type.endsWith("Del")){ //删除数据:根据入口参数提供的RowKeys
                    if(args.length<3){
                        return;
                    }
                    String[] rks = new String[args.length - 2];
                    System.arraycopy(args,2,rks,0,rks.length);
                    dao.delete(args[1],rks);
                }else if(type.endsWith("Get")){ //Get查询: 一行,一族,一列
                    if(args.length<3){
                        return;
                    }
                    // 列族 or 列族 列
                    String[] rks = new String[args.length - 3];
                    System.arraycopy(args,3,rks,0,rks.length);
                    // 塞进表名+rowkey+()
                    dao.get(args[1],args[2],rks);
                }else if(type.endsWith("Scan")){ //查询表中所有数据
                    String[] rks = new String[args.length - 2];
                    System.arraycopy(args,2,rks,0,rks.length);
                    dao.scan(args[1],rks);
                }else if(type.endsWith("Filter")){ //条件查询
                    if(args.length<5){
                        return;
                    }
                    dao.filter(args[1],args[2],args[3],args[4]);
                }
            }
        }
    }

    四、打jar包:打成胖jar包(包含maven依赖,这样linux上不安装maven也可以用)

    *** 一定要在 MENIFEST.MF 中添加启动入口: Main-Class: cn.kb08.hbase.App 

    才可以使用 java -jar xxx.jar 传参a b c 

    五、测试命令:

    tbCreate student name address

    dtPut student /root/kb08/hbase/hbaseFile.txt

    dtDel student 1 2

    dtGet student 3
    dtGet student 3 address
    dtGet student 3 address city

    dtScan student
    dtScan student 3
    dtScan student 3 5 【查的是rowkey= 3 & 4 的数据!】

    dtFilter student address city nanjing 根据<表名-列族-列的值>查询【等同于查询rowkey】

  • 相关阅读:
    MySql—修改权限
    linux apache Tomcat配置SSL(https)步骤
    spark-shell启动错误
    spark
    Ubuntu不能连接网络
    NSGA-II算法学习
    SpringBoot集成mybatis,同时读取一个数据库中多个数据表
    设置虚拟机ip地址
    发送邮件
    spring session
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13604851.html
Copyright © 2011-2022 走看看