zoukankan      html  css  js  c++  java
  • hbase--知识点总结2

    --用java操作hbase  

      1、配置jar包环境  

           创建hbase项目 --> 新建folder文件夹 --> 将hbase相关jar包全部导入到java项目之中 --> add buildpath -->导入hbase conf文件夹下面的配置文件 (配置hbase环境时修改过的所有配置文件)-->

          将配置文件放到hbase的src目录下面 (目的:让java找到hbase)-->导入hadoop相关jar包

     2、查看hbase方法api的方法:在hbase源码安装包中的docs文件夹下apidocs

      3、hbase基本操作源码

    package com.wcg.Hbase;
    
    import java.io.IOException;
    import java.io.InterruptedIOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.hadoop.classification.InterfaceAudience.Private;
    import org.apache.hadoop.classification.InterfaceAudience.Public;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.MasterNotRunningException;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.ZooKeeperConnectionException;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.PrefixFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.generated.master.table_jsp;
    import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor.get;
    import org.apache.hadoop.hbase.thrift2.generated.TMutation;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
    import org.apache.hadoop.io.Stringifier;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.mockito.exceptions.verification.NeverWantedButInvoked;
    
    import com.google.common.collect.Table;
    import com.wcg.Hbase.Phone.PhoneDetail;
    import com.wcg.Hbase.Phone.dayPhoneDetail;
    
    public class HbaseDemo {
        
        
        Configuration conf = null;
        HBaseAdmin admin = null;
        private String TM = "phone";
        HTable table = null;
        
        
        @Before
        public void init() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
            //首先进行初始化
            conf = new Configuration();
            //连接zookeeper
            conf.set("hbase.zookeeper.quorum", "node1,node2,node3");
            admin = new HBaseAdmin(conf);
            table = new HTable(conf, TM);
        
        } 
    
        @Test
        public void createTable() throws IOException{
            //在创建表之前先进行判断该表是否已经存在  
            if(admin.tableExists(TM)){
                admin.disableTable(TM);
                admin.deleteTable(TM);
                
            }
            
            //创建一个表描述的类  
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TM));
            HColumnDescriptor family = new HColumnDescriptor("cf".getBytes());
            desc.addFamily(family);
            admin.createTable(desc);
            
        }
        
        //向表中插入数据  
        @Test
        public void insertDB() throws RetriesExhaustedWithDetailsException, InterruptedIOException{
         String rowkey = "111111";    
            
          Put put = new Put(rowkey.getBytes());        
          
          put.add("cf".getBytes(), "name".getBytes(), "zhangsan".getBytes());
          
          table.put(put);
        
        }
    
        /**
         * 模拟手机通话记录
         * 一共有10个用户,每个用户产生了1000条数据
         * @throws ParseException 
         * @throws InterruptedIOException 
         * @throws RetriesExhaustedWithDetailsException 
         * @throws IOException
         */
        @Test
        public void insertDB2() throws ParseException, RetriesExhaustedWithDetailsException, InterruptedIOException{
            List<Put> list = new ArrayList<Put>();
            for(int i=0;i<5;i++){
                String phoneNum = getPhone("158"); 
                for(int j= 0; j<1000;j++){
                    //产生一些其他类型的数据
                    String dnum = getPhone("182");
                    String length = r.nextInt(99)+"";
                    String type = r.nextInt(2)+"";
                    String date = getDate("2019");
                    //随机产生一个rowkey 
                    String rowkey = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse(date).getTime());
                    Put put = new Put(rowkey.getBytes());
                    put.add("cf".getBytes(), "dnum".getBytes(), dnum.getBytes());
                    put.add("cf".getBytes(), "length".getBytes(), length.getBytes());
                    put.add("cf".getBytes(), "type".getBytes(), type.getBytes());
                    put.add("cf".getBytes(), "date".getBytes(), date.getBytes());
                    list.add(put);
                    
                }    
            }
            table.put(list);
        }
        /*
         * 使用protobuf进行数据插入  
         * 
         */
        @Test
        public void insertDB3() throws ParseException, RetriesExhaustedWithDetailsException, InterruptedIOException{
            List<Put> list = new ArrayList<Put>();
            for(int i=0;i<5;i++){
                String phoneNum = getPhone("158"); 
                for(int j= 0; j<1000;j++){
                    //产生一些其他类型的数据
                    String dnum = getPhone("182");
                    String length = r.nextInt(99)+"";
                    String type = r.nextInt(2)+"";
                    String date = getDate("2019");
                    //随机产生一个rowkey 
                    String rowkey = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse(date).getTime());
                    Phone.PhoneDetail.Builder phoneDetail = Phone.PhoneDetail.newBuilder();
                    phoneDetail.setDate(date);
                    phoneDetail.setDnum(dnum);
                    phoneDetail.setLength(length);
                    phoneDetail.setType(type);
                    Put put = new Put(rowkey.getBytes());
                    put.add("cf".getBytes(), "phoneDetail".getBytes(), phoneDetail.build().toByteArray());
                    //注:不能写成phoneDetail.toString().getBytes(),这样会将只想对象的地址转化为字节数组  
                    
                    list.add(put);
                    
                }    
            }
            table.put(list);
        }
        /*
         * 以用户作为rowkey进行数据压缩
         * 
         * 
         */
        @Test
        public void insertDB4() throws ParseException, RetriesExhaustedWithDetailsException, InterruptedIOException{
            List<Put> puts = new ArrayList<Put>();
            for(int i =0;i<10;i++){
                String phoneNum = getPhone("158");
                String rowkey = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse("20190101000000").getTime());
                Phone.dayPhoneDetail.Builder dayPhone = Phone.dayPhoneDetail.newBuilder();
                for (int j = 0 ;j<1000;j++){
                    //产生一些其他类型的数据
                    String dnum = getPhone("182");
                    String length = r.nextInt(99)+"";
                    String type = r.nextInt(2)+"";
                    String date = getDate2("20190101");
                    //随机产生一个rowkey 
                    Phone.PhoneDetail.Builder phoneDetail = Phone.PhoneDetail.newBuilder();
                    phoneDetail.setDate(date);
                    phoneDetail.setDnum(dnum);
                    phoneDetail.setLength(length);
                    phoneDetail.setType(type);
                    dayPhone.addDayofPhone(phoneDetail);                    
                }
                Put put = new Put(rowkey.getBytes());
                put.add("cf".getBytes(), "day".getBytes(),dayPhone.build().toByteArray());
                puts.add(put);
            }
            table.put(puts);    
        }
        
        
        
        
        private String getDate2(String string) {
            return string+String.format("%02d%02d%02d",r.nextInt(24),r.nextInt(60),r.nextInt(60));
        }
    
        /*
         * 通过get方法获取一条用protobuf存储的数据  
         * 
         */
        @Test
        public void get2() throws IOException{
            Get get = new Get("15865543021_9223370490642209807".getBytes());
            Result re = table.get(get);
            Cell cell = re.getColumnLatestCell("cf".getBytes(), "phoneDetail".getBytes());
            //将获取到的Cell字节数组转化为一个对象 
            Phone.PhoneDetail phoneDetail1 =  Phone.PhoneDetail.parseFrom(CellUtil.cloneValue(cell));
            System.out.println(phoneDetail1);
            
        }
        
        /*
         * 用get方法获取之前插入的数据
         * 
         * 
         */
        @Test
        public void get3() throws IOException{
            int count = 0;
            Get get = new Get("15897845910_9223370490582775807".getBytes());
            Result re = table.get(get);
            Cell cell = re.getColumnLatestCell("cf".getBytes(), "day".getBytes());
            //将获取到的Cell字节数组转化为一个对象 
            Phone.dayPhoneDetail dayPhone = Phone.dayPhoneDetail.parseFrom(CellUtil.cloneValue(cell));
            for(PhoneDetail pd: dayPhone.getDayofPhoneList()){
                
                System.out.println(pd);
                count++;
            }
            
            System.out.println(count);
        }
        
        
        /*
         * 查询某一个用户2月份的所有通话记录
         * 
         */
        @Test 
        public void scan2() throws ParseException, IOException{
            String phoneNum = "15890601889";
            String startRow = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse("20190301000000").getTime());
            String stopRow = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse("20190201000000").getTime());
            Scan scan= new Scan();
            scan.setStartRow(startRow.getBytes());
            scan.setStopRow(stopRow.getBytes());
            //从table对象中获取scan对象
            ResultScanner rss = table.getScanner(scan);
            for(Result rs : rss){
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "dnum".getBytes())))));            
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "length".getBytes())))));            
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "type".getBytes())))));            
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "date".getBytes())))));                
            }
            
        }
        
        /*
         * 查询主叫用户的数据
         * 用过滤器对主叫用户的数据进行过滤
         * 
         */
        @Test
        public void filter() throws IOException{
            String phoneNum = "15890601889";
            FilterList lists = new FilterList();
            SingleColumnValueFilter filter1 = new SingleColumnValueFilter("cf".getBytes(), "type".getBytes(), CompareOp.EQUAL, "1".getBytes());
            PrefixFilter filter2 = new PrefixFilter(phoneNum.getBytes());
            lists.addFilter(filter1);
            lists.addFilter(filter2);
            Scan scan = new Scan();
            scan.setFilter(lists);
            ResultScanner rss = table.getScanner(scan);
            for(Result rs : rss){
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "dnum".getBytes())))));            
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "length".getBytes())))));            
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "type".getBytes())))));            
              System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "date".getBytes())))));                
            }
            
        }
        
        
        
        
        private String getDate(String string) {
            
            return string+string.format("%02d%02d%02d%02d%02d", r.nextInt(12)+1,r.nextInt(31),r.nextInt(24),r.nextInt(60),r.nextInt(60));
        }
    
    
        Random r = new Random(); 
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        private String getPhone(String string){
            //产生一个8位的随机整数,当数据不足8位的时候,在前面用0补齐
            return string+String.format("%08d", r.nextInt(99999999));
            
        }
        
        
        
        //用get的方式获取数据 
        @Test 
        public void get() throws IOException{
            String rowkey = "111111";
            
            Get get = new Get(rowkey.getBytes());
            Result rs = table.get(get);
              //Bytes.toString((rs.getValue("cf".getBytes(), "name".getBytes())));
            System.out.println("+++++++++++++++数据开始得到+++++++++++++++");
            Cell cell = rs.getColumnLatestCell("cf".getBytes(), "name".getBytes());
            //System.out.println(org.apache.hadoop.hbase.util.Bytes.toString(cell.getValue()));
            String mm = Bytes.toString(CellUtil.cloneValue(cell));
            System.out.println(mm);
            System.out.println("++++++++++++++++数据已经得到+++++++++++++++++++++");
            
        }
        
        
        
        @After
        public void destroy() throws IOException{
            if(admin!=null){
                admin.close();
            }
            
            if(table!=null){
                table.close();
                
            }
        }
        
    }
  • 相关阅读:
    性能测试,负载测试,压力测试,容量测试的区别
    全文检索引擎Solr系列——整合MySQL、MongoDB
    全文检索引擎Solr系列——整合中文分词组件IKAnalyzer
    全文检索引擎Solr系列——整合中文分词组件mmseg4j
    全文检索引擎Solr系列——Solr核心概念、配置文件
    全文检索引擎Solr系列—–全文检索基本原理
    ES之一:Elasticsearch6.4 windows安装 head插件ik分词插件安装
    FTP,FTPS,FTPS与防火墙
    NGINX通过Stream转发ftp请求
    nginx 的第三方模块ngx_http_accesskey_module 来实现下载文件的防盗链步骤(linux系统下)
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/10508450.html
Copyright © 2011-2022 走看看