zoukankan      html  css  js  c++  java
  • 大数据之路Week10_day01 (练习:通过设计rowkey来实现查询需求)

    1、准备数据

    链接:https://pan.baidu.com/s/1fRECXp0oWM1xgxc0uoniAA
    提取码:4k43 

    2、需求如下

      (1)查询出10条某个人的最近出现的位置信息

      (2)查询出某个人在某一天在某个城市的所有位置信息

    3、设计rowkey

      手机号_(某一个大值 - 进入位置的开始时间)

      因为Hbase的rowkey是以字典顺序来排序的,如果我们就直接将其存进去,是升序的,而要求是最新的位置信息,那么肯定是倒序的。

    4、编写连接、创建表、插入数据、查询数据的代码

    package com.wyh.parctise;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.filter.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    public class dianxin_Demo1 {
        Configuration conf;
        HConnection conn;
        HBaseAdmin hBaseAdmin;
    
        @Before
        public void Cline(){
            try {
                conf = new Configuration();
                conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
                conn = HConnectionManager.createConnection(conf);
                hBaseAdmin = new HBaseAdmin(conf);
                System.out.println("建立连接成功。。"+conn);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 创建表
         */
        @Test
        public void createTable(){
            try {
                //创建表
                HTableDescriptor dianxin_1 = new HTableDescriptor("dianxin_1");
                //设置表的列簇
                HColumnDescriptor info = new HColumnDescriptor("info");
    
                //将列簇加入到
                dianxin_1.addFamily(info);
                //通过Hmaster的对象进行创建表
                hBaseAdmin.createTable(dianxin_1);
    
                System.out.println(Bytes.toString(dianxin_1.getName())+"表 创建完毕。。");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 往表中批量的插入数据
         */
        @Test
        public void putAllData(){
            //定义一个ArrayList集合存Put实例
            ArrayList<Put> puts = new ArrayList<>();
            try {
                //获取到表的实例
                HTableInterface dianxin_1 = conn.getTable("dianxin_1");
                //读取文件信息
                BufferedReader br = new BufferedReader(new FileReader("D:\shujia\shujia006\hadoop\src\data\dianxin_data"));
                String line;
                while ((line=br.readLine())!=null){
                    //使用字符串的分割方法将其一行数据进行分割
                    String[] split = line.split(",");
                    if(!"\N".equals(split[5])){
                        Long l = 20200000000000L-Long.parseLong(split[5]);
                        String startTime = String.valueOf(l);
    
                        //将开始位置和手机号拼接作为rowkey
                        String rowkey = split[0]+"_"+startTime;
    
                        String wg = split[1];
                        String cityID = split[2];
                        String qxID = split[3];
                        String stayTime = split[4];
                        String leaveTime = split[6];
                        String day = split[7];
    
    
                        //创建一个Put实例,将rowkey作为rowkey
                        Put put = new Put(rowkey.getBytes());
    
                        //将其他的字段作为列值加入
                        put.add("info".getBytes(),"wg".getBytes(),wg.getBytes());
                        put.add("info".getBytes(),"cityID".getBytes(),cityID.getBytes());
                        put.add("info".getBytes(),"qxID".getBytes(),qxID.getBytes());
                        put.add("info".getBytes(),"stayTime".getBytes(),stayTime.getBytes());
                        put.add("info".getBytes(),"leaveTime".getBytes(),leaveTime.getBytes());
                        put.add("info".getBytes(),"day".getBytes(),day.getBytes());
    
                        //将这个Put加入到puts集合中去
                        puts.add(put);
                    }
    
                }
    
                //将这个Put集合进行添加
                dianxin_1.put(puts);
                System.out.println(br.getClass().getName()+"文件数据加载完毕。。。");
    
                //关闭输入流
                br.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
    
        /**
         * 读取一条数据
         */
        @Test
        public void getData(){
            try {
                //获取到表的实例
                HTableInterface dianxin_1 = conn.getTable("dianxin_1");
                //创建get实例 提供一个行键
                Long i = 20200000000000L - 20180503190539L;
                String s1 = "D55433A437AEC8D8D3DB2BCA56E9E64392A9D93C_"+i;
                Get get = new Get(s1.getBytes());
    
                //通过调用get()方法来过去数据
                Result result = dianxin_1.get(get);
    
                /**
                 * 第一种方式,已知列
                 */
                //调用getrow获取二进制数组结果,再用Bytes工具类的toString()方法转换为字符串
                String id = Bytes.toString(result.getRow());
                String[] split = id.split("_");
                String pthone = split[0];
                String startTime = split[1];
                Long newstartTime = 20200000000000L-Long.parseLong(startTime);
    
                //调用返回结果的getValue()方法获取到每个字段值
                String wg = Bytes.toString(result.getValue("info".getBytes(), "wg".getBytes()));
                String cityID = Bytes.toString(result.getValue("info".getBytes(), "cityID".getBytes()));
                String qxID = Bytes.toString(result.getValue("info".getBytes(), "qxID".getBytes()));
                String stayTime = Bytes.toString(result.getValue("info".getBytes(), "stayTime".getBytes()));
                String leaveTime = Bytes.toString(result.getValue("info".getBytes(), "leaveTime".getBytes()));
                String day = Bytes.toString(result.getValue("info".getBytes(), "day".getBytes()));
    
                //输出到控制台
                System.out.println(pthone+"	"+wg+"	"+cityID+"	"+qxID+"	"+stayTime+"	"+newstartTime+"	"+leaveTime+"	"+day);
    
                /**
                 * 第二种方式,未知列
                 *
                 * @deprecated as of 0.96, use {@link CellUtil#cloneValue(Cell)}
                 */
                List<Cell> cells = result.listCells();
                String id1 = Bytes.toString(result.getRow());
                String[] split1 = id1.split("_");
                String pthone1 = split1[0];
                String startTime1 = split1[1];
    
                System.out.print(pthone1+"	"+startTime1+"	");
                for (Cell cell : cells) {
                    String s = Bytes.toString(CellUtil.cloneValue(cell));
                    System.out.print(s);
                    System.out.print("	");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 读取某个人的最新10条位置信息
         */
        @Test
        public void scanData(){
            try {
                //获取到表的实例
                HTableInterface dianxin_1 = conn.getTable("dianxin_1");
                //创建scan实例
                Scan scan = new Scan();
    
                //创建比较器 二进制前缀比较器
                BinaryPrefixComparator binaryPrefixComparator1 = new BinaryPrefixComparator("47BE1E866CFC071DB19D5E1C056BE28AE24C16E7".getBytes());
                RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator1);
    
    
                //创建一个过滤器集合
                FilterList filterList = new FilterList();
                //将多个过滤器加进去
                filterList.addFilter(rowFilter);
    
                //添加过滤器
                scan.setFilter(filterList);
    
                //获取到scan结果集
                ResultScanner scanner = dianxin_1.getScanner(scan);
    
                int count=0;
                //遍历这个结果集
                Result result;
                while ((result=scanner.next())!=null){
                   print(result);
                   count = count+1;
                   if(count==10){
                       break;
                   }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
        /**
         * 读取某一天,某个人在某个城市的所有位置信息
         */
        @Test
        public void scanData1(){
            try {
                //获取到表的实例
                HTableInterface dianxin_1 = conn.getTable("dianxin_1");
                //创建scan实例
                Scan scan = new Scan();
    
                //创建比较器 二进制前缀比较器
                BinaryPrefixComparator binaryPrefixComparator1 = new BinaryPrefixComparator("47BE1E866CFC071DB19D5E1C056BE28AE24C16E7".getBytes());
                RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator1);
    
                //包含比较器
                SubstringComparator substringComparator = new SubstringComparator("20180503");
    
                //创建一个过滤器 列值过滤器
                SingleColumnValueFilter singleColumnValueFilter1 = new SingleColumnValueFilter("info".getBytes(), "day".getBytes(),
                        CompareFilter.CompareOp.EQUAL, substringComparator);
    
                //包含比较器
                SubstringComparator substringComparator1 = new SubstringComparator("83401");
    
                //创建一个过滤器 列值过滤器
                SingleColumnValueFilter singleColumnValueFilter2 = new SingleColumnValueFilter("info".getBytes(), "cityID".getBytes(),
                        CompareFilter.CompareOp.EQUAL, substringComparator1);
    
    
                //创建一个过滤器集合
                FilterList filterList = new FilterList();
                //将多个过滤器加进去
                filterList.addFilter(rowFilter);
                filterList.addFilter(singleColumnValueFilter2);
                filterList.addFilter(singleColumnValueFilter1);
    
                //添加过滤器
                scan.setFilter(filterList);
    
                //获取到scan结果集
                ResultScanner scanner = dianxin_1.getScanner(scan);
    
                //遍历这个结果集
                Result result;
                while ((result=scanner.next())!=null){
                    print(result);
                }
                scanner.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    
    
        /**
         * 关闭Hbase的连接
         */
        @After
        public void close(){
            /**
             * 关闭hBaseAdmin
             */
            for (int i = 0;i<5;i++){
                System.out.println();
            }
    
            if(hBaseAdmin!=null){
                try {
                    hBaseAdmin.close();
                    System.out.println("hBaseAdmin连接已经关闭。。");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            /**
             * 关闭connection
             */
            if(conn!=null){
                try {
                    conn.close();
                    System.out.println("connection连接已经关闭。。");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static void print(Result result){
            String id = Bytes.toString(result.getRow());
            String[] split = id.split("_");
            String pthone = split[0];
            String startTime = split[1];
            Long newstartTime = 20200000000000L-Long.parseLong(startTime);
    
            //调用返回结果的getValue()方法获取到每个字段值
            String wg = Bytes.toString(result.getValue("info".getBytes(), "wg".getBytes()));
            String cityID = Bytes.toString(result.getValue("info".getBytes(), "cityID".getBytes()));
            String qxID = Bytes.toString(result.getValue("info".getBytes(), "qxID".getBytes()));
            String stayTime = Bytes.toString(result.getValue("info".getBytes(), "stayTime".getBytes()));
            String leaveTime = Bytes.toString(result.getValue("info".getBytes(), "leaveTime".getBytes()));
            String day = Bytes.toString(result.getValue("info".getBytes(), "day".getBytes()));
    
            //输出到控制台
            System.out.println(pthone+"	"+wg+"	"+cityID+"	"+qxID+"	"+stayTime+"	"+newstartTime+"	"+leaveTime+"	"+day);
        }
    }
  • 相关阅读:
    第三周作业
    第二周作业
    第一周作业
    第三次作业
    第二次作业
    c语言最后一次作业
    第14、15教学周作业
    第七周作业
    第六周随笔
    第四周作业
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12168968.html
Copyright © 2011-2022 走看看