--用java操作hbase
1、配置jar包环境
创建hbase项目 --> 新建folder文件夹 --> 将hbase相关jar包全部导入到java项目之中 --> add buildpath -->导入hbase conf文件夹下面的配置文件 (配置hbase环境时修改过的所有配置文件)-->
将配置文件放到hbase的src目录下面 (目的:让java找到hbase)-->导入hadoop相关jar包
2、查看hbase方法api的方法:在hbase源码安装包中的docs文件夹下apidocs
3、hbase基本操作源码
package com.wcg.Hbase; import java.io.IOException; import java.io.InterruptedIOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.generated.master.table_jsp; import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor.get; import org.apache.hadoop.hbase.thrift2.generated.TMutation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; import org.apache.hadoop.io.Stringifier; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.exceptions.verification.NeverWantedButInvoked; import com.google.common.collect.Table; import com.wcg.Hbase.Phone.PhoneDetail; import com.wcg.Hbase.Phone.dayPhoneDetail; public class HbaseDemo { Configuration conf = null; HBaseAdmin admin = null; private String TM = "phone"; HTable table = null; @Before public void init() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //首先进行初始化 conf = new Configuration(); //连接zookeeper conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); admin = new HBaseAdmin(conf); table = new HTable(conf, TM); } @Test public void createTable() throws IOException{ //在创建表之前先进行判断该表是否已经存在 if(admin.tableExists(TM)){ admin.disableTable(TM); admin.deleteTable(TM); } //创建一个表描述的类 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TM)); HColumnDescriptor family = new HColumnDescriptor("cf".getBytes()); desc.addFamily(family); admin.createTable(desc); } //向表中插入数据 @Test public void insertDB() throws RetriesExhaustedWithDetailsException, InterruptedIOException{ String rowkey = "111111"; Put put = new Put(rowkey.getBytes()); put.add("cf".getBytes(), "name".getBytes(), "zhangsan".getBytes()); table.put(put); } /** * 模拟手机通话记录 * 一共有10个用户,每个用户产生了1000条数据 * @throws ParseException * @throws InterruptedIOException * @throws RetriesExhaustedWithDetailsException * @throws IOException */ @Test public void insertDB2() throws ParseException, RetriesExhaustedWithDetailsException, InterruptedIOException{ List<Put> list = new ArrayList<Put>(); for(int i=0;i<5;i++){ String phoneNum = getPhone("158"); for(int j= 0; j<1000;j++){ //产生一些其他类型的数据 String dnum = getPhone("182"); String length = r.nextInt(99)+""; String type = r.nextInt(2)+""; String date = getDate("2019"); //随机产生一个rowkey String rowkey = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse(date).getTime()); Put put = new Put(rowkey.getBytes()); put.add("cf".getBytes(), "dnum".getBytes(), dnum.getBytes()); put.add("cf".getBytes(), "length".getBytes(), length.getBytes()); put.add("cf".getBytes(), "type".getBytes(), type.getBytes()); put.add("cf".getBytes(), "date".getBytes(), date.getBytes()); list.add(put); } } table.put(list); } /* * 使用protobuf进行数据插入 * */ @Test public void insertDB3() throws ParseException, RetriesExhaustedWithDetailsException, InterruptedIOException{ List<Put> list = new ArrayList<Put>(); for(int i=0;i<5;i++){ String phoneNum = getPhone("158"); for(int j= 0; j<1000;j++){ //产生一些其他类型的数据 String dnum = getPhone("182"); String length = r.nextInt(99)+""; String type = r.nextInt(2)+""; String date = getDate("2019"); //随机产生一个rowkey String rowkey = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse(date).getTime()); Phone.PhoneDetail.Builder phoneDetail = Phone.PhoneDetail.newBuilder(); phoneDetail.setDate(date); phoneDetail.setDnum(dnum); phoneDetail.setLength(length); phoneDetail.setType(type); Put put = new Put(rowkey.getBytes()); put.add("cf".getBytes(), "phoneDetail".getBytes(), phoneDetail.build().toByteArray()); //注:不能写成phoneDetail.toString().getBytes(),这样会将只想对象的地址转化为字节数组 list.add(put); } } table.put(list); } /* * 以用户作为rowkey进行数据压缩 * * */ @Test public void insertDB4() throws ParseException, RetriesExhaustedWithDetailsException, InterruptedIOException{ List<Put> puts = new ArrayList<Put>(); for(int i =0;i<10;i++){ String phoneNum = getPhone("158"); String rowkey = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse("20190101000000").getTime()); Phone.dayPhoneDetail.Builder dayPhone = Phone.dayPhoneDetail.newBuilder(); for (int j = 0 ;j<1000;j++){ //产生一些其他类型的数据 String dnum = getPhone("182"); String length = r.nextInt(99)+""; String type = r.nextInt(2)+""; String date = getDate2("20190101"); //随机产生一个rowkey Phone.PhoneDetail.Builder phoneDetail = Phone.PhoneDetail.newBuilder(); phoneDetail.setDate(date); phoneDetail.setDnum(dnum); phoneDetail.setLength(length); phoneDetail.setType(type); dayPhone.addDayofPhone(phoneDetail); } Put put = new Put(rowkey.getBytes()); put.add("cf".getBytes(), "day".getBytes(),dayPhone.build().toByteArray()); puts.add(put); } table.put(puts); } private String getDate2(String string) { return string+String.format("%02d%02d%02d",r.nextInt(24),r.nextInt(60),r.nextInt(60)); } /* * 通过get方法获取一条用protobuf存储的数据 * */ @Test public void get2() throws IOException{ Get get = new Get("15865543021_9223370490642209807".getBytes()); Result re = table.get(get); Cell cell = re.getColumnLatestCell("cf".getBytes(), "phoneDetail".getBytes()); //将获取到的Cell字节数组转化为一个对象 Phone.PhoneDetail phoneDetail1 = Phone.PhoneDetail.parseFrom(CellUtil.cloneValue(cell)); System.out.println(phoneDetail1); } /* * 用get方法获取之前插入的数据 * * */ @Test public void get3() throws IOException{ int count = 0; Get get = new Get("15897845910_9223370490582775807".getBytes()); Result re = table.get(get); Cell cell = re.getColumnLatestCell("cf".getBytes(), "day".getBytes()); //将获取到的Cell字节数组转化为一个对象 Phone.dayPhoneDetail dayPhone = Phone.dayPhoneDetail.parseFrom(CellUtil.cloneValue(cell)); for(PhoneDetail pd: dayPhone.getDayofPhoneList()){ System.out.println(pd); count++; } System.out.println(count); } /* * 查询某一个用户2月份的所有通话记录 * */ @Test public void scan2() throws ParseException, IOException{ String phoneNum = "15890601889"; String startRow = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse("20190301000000").getTime()); String stopRow = phoneNum+"_"+(Long.MAX_VALUE-sdf.parse("20190201000000").getTime()); Scan scan= new Scan(); scan.setStartRow(startRow.getBytes()); scan.setStopRow(stopRow.getBytes()); //从table对象中获取scan对象 ResultScanner rss = table.getScanner(scan); for(Result rs : rss){ System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "dnum".getBytes()))))); System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "length".getBytes()))))); System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "type".getBytes()))))); System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "date".getBytes()))))); } } /* * 查询主叫用户的数据 * 用过滤器对主叫用户的数据进行过滤 * */ @Test public void filter() throws IOException{ String phoneNum = "15890601889"; FilterList lists = new FilterList(); SingleColumnValueFilter filter1 = new SingleColumnValueFilter("cf".getBytes(), "type".getBytes(), CompareOp.EQUAL, "1".getBytes()); PrefixFilter filter2 = new PrefixFilter(phoneNum.getBytes()); lists.addFilter(filter1); lists.addFilter(filter2); Scan scan = new Scan(); scan.setFilter(lists); ResultScanner rss = table.getScanner(scan); for(Result rs : rss){ System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "dnum".getBytes()))))); System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "length".getBytes()))))); System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "type".getBytes()))))); System.out.println(new String(CellUtil.cloneValue((rs.getColumnLatestCell("cf".getBytes(), "date".getBytes()))))); } } private String getDate(String string) { return string+string.format("%02d%02d%02d%02d%02d", r.nextInt(12)+1,r.nextInt(31),r.nextInt(24),r.nextInt(60),r.nextInt(60)); } Random r = new Random(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); private String getPhone(String string){ //产生一个8位的随机整数,当数据不足8位的时候,在前面用0补齐 return string+String.format("%08d", r.nextInt(99999999)); } //用get的方式获取数据 @Test public void get() throws IOException{ String rowkey = "111111"; Get get = new Get(rowkey.getBytes()); Result rs = table.get(get); //Bytes.toString((rs.getValue("cf".getBytes(), "name".getBytes()))); System.out.println("+++++++++++++++数据开始得到+++++++++++++++"); Cell cell = rs.getColumnLatestCell("cf".getBytes(), "name".getBytes()); //System.out.println(org.apache.hadoop.hbase.util.Bytes.toString(cell.getValue())); String mm = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println(mm); System.out.println("++++++++++++++++数据已经得到+++++++++++++++++++++"); } @After public void destroy() throws IOException{ if(admin!=null){ admin.close(); } if(table!=null){ table.close(); } } }