zoukankan      html  css  js  c++  java
  • JavaAPI与hbase的交互

    JavaAPI连接Hbase

    准备工作:

    •   确保集群各节点服务运行正常
    •   确保zookeeper可以正常工作
    •   已经开启hbase-master,hbase-regionserver
    •   环境:windows7下eclipse,集群正常工作。
      •   由于我们是在windows7下运行,所以我们将服务器中的hadoop程序拷贝到本地。
      •   在系统的环境变量中设置HADOOP_HOME,并且将%HADOOP_HOME%/bin添加到path当中。
      •   接下来在运行中,执行程序的时候eclipse会报错说找不到winutils.exe。所以我们还要下载一个winutils.exe,我这里提供一个github上的下载链接https://github.com/srccodes/hadoop-common-2.2.0-bin,下载以后解压到bin下即可。(版本虽然很旧,但是可以使用)

    连接hbase并进行简单操作:

    1. 创建JavaProject,导入需要的jar包,jar包来自于服务器上hbase中的lib文件夹下的jar文件,所以将lib直接拷贝到当前工程中,并build path(导入这些额外的包)。
    2. 同时拷贝hbase下的log4j.properties文件到项目中,在执行过程中可以查看到执行过程中产生的日志。
    3. 创建连接hbase需要的配置信息
      1. Java客户端其实就是shell客户端的一种实现,操作命令基本上就是shell客户端命令的一个映射。
      2. Java客户端使用的配置信息是被映射到了HbaseConfiguration的实例对象中的,使用create方法创建实例化对象的时候,会从classpath中获取hbase-site.xml文件并进行配置文件内容的读取。同时也会读取hadoop的配置文件信息。这里我们给定zookeeper的相关配置信息即可。
      3. 流程:先通过zookeeper拿到hbase:namespace的路径,然后从这个路径中拿到hbase:meta表的信息,接着就拿到了用户表的路径   
    4. 代码实现如下
       1 package com.hblink.demo;
       2 
       3 import java.io.IOException;
       4 
       5 import org.apache.hadoop.conf.Configuration;
       6 import org.apache.hadoop.hbase.HBaseConfiguration;
       7 
       8 public class Hblink {
       9     /**
      10      * 获取hbase的配置文件信息
      11      * 
      12      * @return
      13      * @throws IOException
      14      */
      15     public static Configuration getHBaseConfiguration() throws IOException {
      16         Configuration conf = HBaseConfiguration.create();
      17         // zookeeper的配置信息
      18         conf.set("hbase.zookeeper.quorum", "kslave5,kslave6,kslave7");// zookeeper节点信息
      19         conf.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
      20         // conf.set("dfs.socket.timeout", "180000");
      21         return conf;
      22     }
      23 }
    5. 有了配置信息以后,我们开始通过配置信息连接hbase

      HBaseAdmin类:是主要进行DDL操作相关的一个接口类,主要包括命名空间管理,用户表管理。通过该接口我们可以创建、删除、获取用户表,也可以进行用户表的分割,紧缩等操作。
      HTable类:是hbase中的用户表的一个映射的java实例,通过该类进行表数据的操作,包括数据的增删改查,也就是在这里我们可以类似shell中put,get和sacn进行数据的操作。

      HTableDescriptor类:是hbase用户表的具体描述信息类,一般我们创建表获取表信息,就是通过该类进行的。

       1 package com.hblink.test;
       2 
       3 import java.io.IOException;
       4 
       5 import org.apache.hadoop.conf.Configuration;
       6 import org.apache.hadoop.hbase.HColumnDescriptor;
       7 import org.apache.hadoop.hbase.HTableDescriptor;
       8 import org.apache.hadoop.hbase.TableName;
       9 import org.apache.hadoop.hbase.client.HBaseAdmin;
      10 
      11 import com.hblink.demo.Hblink;
      12 
      13 public class HbTest {
      14     public static void main(String[] args) throws Exception {
      15         Configuration configuration = Hblink.getHBaseConfiguration();
      16         HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);
      17         try {
      18             createTestTable(hBaseAdmin);
      19         } finally {
      20             hBaseAdmin.close(); // 资源释放
      21         }
      22     }
      23 
      24     /**
      25      * 测试创建表table
      26      * 
      27      * @throws IOException
      28      */
      29     static void createTestTable(HBaseAdmin hbAdmin) throws IOException {
      30         TableName tableName = TableName.valueOf("stock-info"); // 创建表名
      31         HTableDescriptor hDescriptor = new HTableDescriptor(tableName);
      32         hDescriptor.addFamily(new HColumnDescriptor("f"));// 给定列族
      33         hbAdmin.createTable(hDescriptor);
      34         System.out.println("创建表成功!");
      35     }
      36 }
    6. 接下来是数据插入
       1 package com.hblink.test;
       2 
       3 import java.io.BufferedReader;
       4 import java.io.File;
       5 import java.io.FileInputStream;
       6 import java.io.FileNotFoundException;
       7 import java.io.IOException;
       8 import java.io.InputStreamReader;
       9 
      10 import org.apache.hadoop.conf.Configuration;
      11 import org.apache.hadoop.hbase.client.HTable;
      12 import org.apache.hadoop.hbase.client.Put;
      13 import org.apache.hadoop.hbase.util.Bytes;
      14 
      15 import com.hblink.demo.Hblink;
      16 
      17 public class TableTest {
      18     public static int count = 0;
      19 
      20     public static void main(String[] args) throws IOException {
      21 
      22         HTable hTable = null;
      23         Configuration configuration = Hblink.getHBaseConfiguration();
      24 
      25         hTable = new HTable(configuration, "stock-info");
      26 
      27         testPut(hTable);//插入数据
      28 
      29         hTable.close();//释放资源
      30 
      31     }
      32 
      33     /**
      34      * 测试往表里插入数据
      35      * 
      36      * @param hTable
      37      * @throws IOException
      38      */
      39     static void testPut(HTable hTable) throws IOException {
      40 
      41         File file = new File("./20171120sh.txt"); //获取本地文件
      42         InputStreamReader isr = null;
      43         try {
      44             isr = new InputStreamReader(new FileInputStream(file), "utf-8");
      45         } catch (FileNotFoundException e) {
      46             e.printStackTrace();
      47         }
      48         if (isr == null) {
      49             return;
      50         }
      51         BufferedReader br = new BufferedReader(isr);
      52         String re = "";
      53         while ((re = br.readLine()) != null) {
      54             String[] sarr = re.split(",");
      55             // System.out.println(sarr[0] + "-" + sarr[1] + "-" + sarr[2] + "-" + sarr[3] +
      56             // sarr[4] + "-" + sarr[5] + "-" + sarr[6]);
      57             // System.out.println(sarr[0]);
      58 
      59             Put put = new Put(Bytes.toBytes(sarr[0]));
      60             put.add(Bytes.toBytes("f"), Bytes.toBytes("Stock"), Bytes.toBytes(sarr[1]));
      61             put.add(Bytes.toBytes("f"), Bytes.toBytes("Date"), Bytes.toBytes(sarr[2]));
      62             put.add(Bytes.toBytes("f"), Bytes.toBytes("Top"), Bytes.toBytes(sarr[3]));
      63             put.add(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"), Bytes.toBytes(sarr[4]));
      64             put.add(Bytes.toBytes("f"), Bytes.toBytes("Volume"), Bytes.toBytes(sarr[5]));
      65             put.add(Bytes.toBytes("f"), Bytes.toBytes("Turnover"), Bytes.toBytes(sarr[6]));
      66             hTable.put(put);
      67             count++;
      68         }
      69         System.out.println("插入" + (count - 1) + "条数据成功!");
      70     }
      71 }
    7. 对表中数据的查询
        1 package com.hblink.test;
        2 
        3 import java.io.IOException;
        4 import java.util.Scanner;
        5 
        6 import org.apache.hadoop.conf.Configuration;
        7 import org.apache.hadoop.hbase.client.HTable;
        8 import org.apache.hadoop.hbase.client.Result;
        9 import org.apache.hadoop.hbase.client.ResultScanner;
       10 import org.apache.hadoop.hbase.client.Scan;
       11 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
       12 import org.apache.hadoop.hbase.filter.Filter;
       13 import org.apache.hadoop.hbase.filter.RegexStringComparator;
       14 import org.apache.hadoop.hbase.filter.RowFilter;
       15 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
       16 import org.apache.hadoop.hbase.util.Bytes;
       17 
       18 import com.hblink.demo.Hblink;
       19 
       20 public class ScanHbase {
       21     public static Boolean flag = true;
       22     public static String string = null;
       23 
       24     public static void main(String[] args) throws IOException {
       25         while (flag) {
       26             HTable hTable = null;
       27             Configuration configuration = Hblink.getHBaseConfiguration();
       28             hTable = new HTable(configuration, "stock-info");
       29 
       30             Scanner sc = new Scanner(System.in);
       31             System.out.print("请输入需要查询的股票代码:");
       32             string = sc.next();
       33 
       34             // scanTestCell(hTable);
       35             scanTestRow(hTable);
       36             hTable.close();
       37             if (string.equals("quit")) {
       38                 flag = false;
       39             }
       40         }
       41     }
       42 
       43     /**
       44      * 通过列查询
       45      * 
       46      * @param hTable
       47      * @throws IOException
       48      */
       49     static void scanTestCell(HTable hTable) throws IOException {
       50 
       51         // 设置过滤器
       52         SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("f"),
       53                 Bytes.toBytes("Date"), CompareOp.EQUAL, Bytes.toBytes(string));
       54         // 设置全表扫描封装类
       55         Scan scan = new Scan();
       56         // 添加过滤器(通过股票代码查询)
       57         scan.setFilter(singleColumnValueFilter);
       58         // 扫描
       59         ResultScanner resultScanner = hTable.getScanner(scan);
       60         for (Result result : resultScanner) {
       61             byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date"));
       62             byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock"));
       63             byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top"));
       64             byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"));
       65             byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume"));
       66             byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover"));
       67 
       68             System.out.print(Bytes.toString(data) + ";");
       69             System.out.print(Bytes.toString(stock) + ";");
       70             if (Bytes.toString(top).equals("--")) {
       71                 System.out.print(Bytes.toString(top) + ";");
       72             } else {
       73                 System.out.print(Bytes.toInt(top) + ";");
       74             }
       75             System.out.print(Bytes.toString(change_rate) + ";");
       76             System.out.print(Bytes.toString(volume) + ";");
       77             System.out.print(Bytes.toString(turnover));
       78             System.out.println();
       79 
       80         }
       81 
       82     }
       83 
       84     /**
       85      * 通过正则--匹配行键
       86      * 
       87      * @param hTable
       88      * @throws IOException
       89      */
       90     static void scanTestRow(HTable hTable) throws IOException {
       91         RegexStringComparator re = new RegexStringComparator("^" + string + "");
       92         Filter filter = new RowFilter(CompareOp.EQUAL, re);
       93         Scan scan = new Scan();
       94         // 添加过滤器(通过股票代码查询)
       95         scan.setFilter(filter);
       96         // 扫描
       97         ResultScanner resultScanner = hTable.getScanner(scan);
       98         for (Result result : resultScanner) {
       99             byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date"));
      100             byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock"));
      101             byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top"));
      102             byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"));
      103             byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume"));
      104             byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover"));
      105 
      106             System.out.print(Bytes.toString(data) + ";");
      107             System.out.print(Bytes.toString(stock) + ";");
      108             if (Bytes.toString(top).equals("--")) {
      109                 System.out.print(Bytes.toString(top) + ";");
      110             } else {
      111                 System.out.print(Bytes.toInt(top) + ";");
      112             }
      113             System.out.print(Bytes.toString(change_rate) + ";");
      114             System.out.print(Bytes.toString(volume) + ";");
      115             System.out.print(Bytes.toString(turnover));
      116             System.out.println();
      117 
      118         }
      119     }
      120 }


    ps.继续学习中=====

  • 相关阅读:
    hdu1316
    MVC中的ViewData、ViewBag和TempData
    linux下性能监控工具
    【翻译自mos文章】执行utlpwdmg.sql之后报ORA-28003, ORA-20001, ORA-20002, ORA-20003, ORA-20004 错误
    HTTP协议的消息头:Content-Type和Accept的作用 转载https://www.cnblogs.com/lexiaofei/p/7289436.html
    HTTP协议的消息头:Content-Type和Accept的作用
    JWT(JSON Web Token) 多网站的单点登录,放弃session 转载https://www.cnblogs.com/lexiaofei/p/7409846.html
    Http协议中get和post的区别
    常用的HTTP请求头与响应头
    浏览器获取自定义响应头response-headers
  • 原文地址:https://www.cnblogs.com/Try-kevin/p/7901245.html
Copyright © 2011-2022 走看看