zoukankan      html  css  js  c++  java
  • 分布式存储系统-HBASE

    转载请标明出处:
    http://blog.csdn.net/zwto1/article/details/47066523
    本文出自:【zhang_way的博客专栏】

    開始几天的HBase之旅!

    简单介绍

    HBase –Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBse技术可在便宜PC Server上搭建起大规模结构化存储集群。HBase利用Hadoop HDFS作为文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据。利用Zookeeper作为协调工具。

    HBase(NoSQL)的数据模型

    HBase 存储的站点页面数据演示样例

    HBase 存储的站点页面数据演示样例

    以下讲下关于HBase的相关名词和概念:
    表(table),是存储管理数据的。


    行键(row key)。相似于MySQL中的主键。Mysql中的主键可有可无。


    行键是HBase表天然自带的。
    列族(colum family),列的集合。
    HBASE列族是须要在定义表时指定的。仅仅须要定义列族而不须要定义列。

    列是在插入数据时动态添加的。
    时间戳(timestamp),是64位长整形整数,在我们数据库里它是一种日期类型能够精确到毫秒级别。在HBASE。是列(也称作标签、修饰符)的一个属性。
    行健和列确定的单元格,能够存储多个数据。每一个数据还有时间戳属性。数据具有版本号特性。
    假设不指定时间戳或者版本号,默认取最新的数据。
    HDFS中存储的数据都是字节数组。
    HBASE中表中的数据是依照行健的顺序物理存储的。

    MySQL中是依照插入的顺序存储的。行健是依照ASSIC码排序的。
    HBASE 是面向列的数据库。依照列族进行存储。关系型数据库是依照行存储的。
    列必须用族来定义,随意一列有例如以下形式:族:标签。

    族和标签都能够是随意的字符串。


    物理上将同族数据存储在一起。
    数据可通过时间戳来区分版本号。

    HBase(NoSQL)的物理模型

    HBase架构图

    HBase架构图

    Hbase适合海量数据(如20PB)的秒级简单查询的数据库。
    Hbase 表中的记录依照行健拆分成一个一个的region。
    Region 存储 region server(单独的物理机器)。这样,对表的操作转化为对多台region server 的并行查询。


    Hbase的体系结构:
    Hbase是主从式结构。HMaster、HRegionServer
    Hmaster:
    Hbase同意有多个HMaster.通过Zookeeper Election机制保证总有一个Master执行。
    Master为regionserver分配region。负责region server的负载均衡,发现失效的region server 并又一次分配其上的region
    HRegionServer:
    Regionserver维护master分配给它的region,处理对这些region的IO请求,负责切分在执行过程中变得过大的region。
    Zookeeper:
    保证不论什么时候 。集群中仅仅有一个running master
    存储全部 region的寻址入口
    实时监控region server的状态,将region server的上线和下线信息,实时通知给master
    存储 hbase的schema,包含哪些table每一个table有哪些column family

    Client訪问hbase上数据的过程并不须要master參与,寻址訪问zookeeper 和 region server,数据读写訪问regionserver。HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBase中最核心的模块。
    Hbase中有两张特殊的Table,-ROOT-和.META.
    .META.:记录了用户的region信息,.META .能够有多个region
    -ROOT-:记录了.META.表的region信息,-ROOT-仅仅有一个region, Zookeeper中记录了-ROOT-表的location。

    Client訪问用户数据之前须要首先訪问zookeeper,然后訪问-ROOT-表,接着訪问.META.表,最后才干找到用户数据的位置訪问。

    HBase伪分布安装:

    1. 解压缩、重命名、环境变量设置
    2. 改动$HBASE_HOME/conf/hbase-env.sh,改动内容例如以下:

      export JAVA_HOME=/usr/local/jdk
      export HBASE_MANAGES_ZK=true
    3. 改动$HBASE_HOME/conf/hbase-site.xml,改动内容例如以下:

      <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop:9000/hbase</value>
      </property>
      <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
      </property>
      <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop</value>
      </property>
      <property>
        <name>dfs.replication</name>
        <value>1</value>
      </property>
    4. (可选)文件regionservers的内容改为hadoop
    5. 启动hbase。执行命令start-hbase.sh
      启动hbase之前,确保hadoop是执行正常的,而且能够写入文件

    6. 验证:(1)执行jps。发现新添加了3个java进程,各自是HMaster、HRegionServer、HQuorumPeer,证明成功安装。
      (2)使用浏览器訪问http://hadoop:60010 查看webUI.

    HBase集群搭建

    1.hbase的集群搭建过程(在原来的hadoop上的hbase伪分布基础上进行搭建)
      1.1 集群结构,主节点(hmaster)是hadoop,从节点(region server)是hadoop1和hadoop2
       1.2 改动hadoop上的hbase的几个文件
       (1)改动hbase-env.sh的最后一行export HBASE_MANAGES_ZK=false

     (2)改动hbase-site.xml文件的hbase.zookeeper.quorum的值为hadoop0,hadoop1,hadoop2。

    改动dfs.replicationg改为3,表示副本数为3。
      (3)改动regionservers文件(存放的region server的hostname)。内容改动为hadoop1、hadoop2
      1.3 复制hadoop中的hbase目录到hadoop1、hadoop2中
    复制hadoop中的/etc/profile到hadoop1、hadoop2中,在hadoop1、hadoop2上执行source /etc/profile
      1.4 首先启动hadoop。然后启动zookeeper集群在各个节点上分别启动。
    最后在hadoop上启动hbase集群。

    出现的问题及解决的方法:

    由于在搭建集群的时候。hbase在执行状态下,改动了配置造成:
    1) hbase web 界面进不去
    2) hbase shell 操作没有结果
    3) 执行stop-hbase.sh 关闭不了
    解决的方法:
    1) 把全部节点关掉,再次启动
    2) 另一个解决的方法:杀死hbase进程 kill -9 pid 用ps –ef | grep 查看hbase进程

    以下我们学习下HBase下的shell操作。

    HBase Shell

    1. 进入hbase shell,终端:

      hbase shell
    2. hbase shell操作:
      创建表: create ‘表名称’, ‘列族名称1’,’列族名称2’,’列族名称N’
      加入记录: put ‘表名称’, ‘行名称’, ‘列名称:’, ‘值’
      查看记录: get ‘表名称’, ‘行名称’
      查看表中的记录总数: count ‘表名称’
      删除一张表: 先要屏蔽该表。才干对该表进行删除。
      第一步 disable ‘表名称’ 第二步 drop ‘表名称’
      查看全部记录: scan “表名称”
      查看某个表某个列中全部数据: scan “表名称” {COLUMNS=>’列族名称:列名称’}
      更新记录: 就是重写一遍进行覆盖

    eg:
    表users,有三个列族user_id,address,info:
    创建表:

    create 'users','user_id','address','info'

    列出全部表

     list

    得到表的描写叙述

     describe 'users'

    删除表:

    disable  ‘user’
    dropuser

    加入记录:

    put 'users','xiaoming','info:age','24'

    获取一条记录
    1.取得一个id的全部数据

    get 'users','xiaoming'

    2.获取一个id,一个列族的全部数据

    get 'users','xiaoming','info' 

    3.获取一个id,一个列族中一个列的 全部数据

    get 'users','xiaoming','info:age'

    更新记录

    put 'users','xiaoming','info:age' ,'29' 
    
    get 'users','xiaoming','info:age' 
    
    put 'users','xiaoming','info:age' ,'30' 
    
    get 'users','xiaoming','info:age' 

    获取单元格数据的版本号数据

    get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>1} 
    
    get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>2} 
    
    get 'users','xiaoming',{COLUMN=>'info:age',VERSIONS=>3} 

    获取单元格数据的某个版本号数据

    get 'users','xiaoming',{COLUMN=>'info:age',TIMESTAMP=>1364874937056} 

    全表扫描

    scan 'users'

    行数的推断是依据行健来推断的

    HBase 中查找数据的方式
    1. get方式
    2. scan方式

    删除xiaoming值的’info:age’字段

    delete 'users','xiaoming','info:age' 
    
    get 'users','xiaoming' 

    HBase列:put动态添加 delete动态降低
    删除整行

    deleteall 'users','xiaoming'

    统计表的行数

     count 'users'

    清空表

     truncate 'users' 

    坚持下,以下我们看下关于Java针对HBase的操作,这个时候java程序猿应该兴奋才是!

    !!

    HBase的Java_API操作

    
    package hbase;
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    
    /*
     * 创建表、插入记录、查询一条记录、遍历全部记录、删除表
     * */
    public class HBaseApp {
        private static final String TABLE_NAME = "table1";
        private static final String FAMILY_NAME = "family1";
        private static final String  ROW_KEY = "rowkey1";
    
        public static void main(String[] args) throws Exception {
    
        Configuration conf  = HBaseConfiguration.create();  
        conf.set("hbase.rootdir", "hdfs://hadoop:9000/hbase");
        //使用eclipse时必须加入这个,否则无法定位
        conf.set("hbase.zookeeper.quorum", "hadoop");
        //创建表、删除表使用HBaseAdmin
       final HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
       createTable(hBaseAdmin);
       //deleteTable(hBaseAdmin);
    
        //插入记录、查询一条记录、遍历全部记录使用HTable 
         final HTable hTable = new HTable(conf, TABLE_NAME);
    //     putRecord(hTable);
    //     getRecord(hTable);
    
          scanTable(hTable);
        }
    
        private static void scanTable(final HTable hTable) throws IOException {
            Scan scan = new Scan();
             final ResultScanner scanner = hTable.getScanner(scan);
    
             for (Result result : scanner) {
                 final byte[] value = result.getValue(FAMILY_NAME.getBytes(),"age".getBytes());
                 System.out.println(result + "	"+ new String(value));
            }
        }
    
        //查询全部记录
    
    
        //查询一条记录
        private static void getRecord(final HTable hTable) throws IOException {
             Get get = new Get(ROW_KEY.getBytes());
             final Result result = hTable.get(get);
             final byte[] value = result.getValue(FAMILY_NAME.getBytes(),"age".getBytes());
    
             System.out.println(result + "	"+ new String(value));
        }
    
          //插入一条记录
        private static void putRecord(final HTable hTable) throws IOException {
            Put put = new Put(ROW_KEY.getBytes());
             put.add(FAMILY_NAME.getBytes(), "age".getBytes(), "25".getBytes());
             hTable.put(put);
             hTable.close();
        }
    
    
        //删除表
        private static void deleteTable(final HBaseAdmin hBaseAdmin)
                throws IOException {
              hBaseAdmin.disableTable(TABLE_NAME);
              hBaseAdmin.deleteTable(TABLE_NAME);
        }
    
        //创建表
        private static void createTable(final HBaseAdmin hBaseAdmin)
                throws IOException {
            if(!hBaseAdmin.tableExists(TABLE_NAME)){
    //表名
    HTableDescriptor descriptor = new HTableDescriptor(TABLE_NAME); 
    //加入列族             
    HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);                           
    descriptor.addFamily(family);
    hBaseAdmin.createTable(descriptor);
               }
        }   
    }
    2>:
    hdfs数据批量导入到hbase中
    
    create 'wlan_log', 'cf' 
    package hbase;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    
    public class BatchImport {
        static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
            SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
            Text v2 = new Text();
    
            protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
                final String[] splited = value.toString().split("	");
                try {
                    final Date date = new Date(Long.parseLong(splited[0].trim()));
                    final String dateFormat = dateformat1.format(date);
                    String rowKey = splited[1]+":"+dateFormat;
                    v2.set(rowKey+"	"+value.toString());
                    context.write(key, v2);
                } catch (NumberFormatException e) {
                    final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
                    counter.increment(1L);
                    System.out.println("出错了"+splited[0]+" "+e.getMessage());
                }
            };
        }
    
        static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
            protected void reduce(LongWritable key, java.lang.Iterable<Text> values,    Context context) throws java.io.IOException ,InterruptedException {
                for (Text text : values) {
                    final String[] splited = text.toString().split("	");
    
                    final Put put = new Put(Bytes.toBytes(splited[0]));
                    put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
                    put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));
                    //省略其它字段。调用put.add(....)就可以
                    context.write(NullWritable.get(), put);
                }
            };
        }
    
    
        public static void main(String[] args) throws Exception {
            final Configuration configuration = new Configuration();
            //设置zookeeper
            configuration.set("hbase.zookeeper.quorum", "hadoop");
            //设置hbase表名称
            configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
            //将该值改大,防止hbase超时退出
            configuration.set("dfs.socket.timeout", "180000");
    
            final Job job = new Job(configuration, "HBaseBatchImport");
    
            job.setMapperClass(BatchImportMapper.class);
            job.setReducerClass(BatchImportReducer.class);
            //设置map的输出。不设置reduce的输出类型
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            //不再设置输出路径,而是设置输出格式类型
            job.setOutputFormatClass(TableOutputFormat.class);
    
            FileInputFormat.setInputPaths(job, "hdfs://hadoop:9000/input");
    
            job.waitForCompletion(true);
        }
    
    }
  • 相关阅读:
    php socket 读取缓存区域
    PHP依赖注入的作用
    谷歌浏览器调试文字都变成font标签的解决方法
    php socket 同步异步堵塞非堵塞的区别
    css3中background-size中的cover与contain的区别
    css3 line-height:0的作用
    RDD的创建方式
    Serializable序列化操作解惑
    SparkCore分布式计算模拟
    spark不同环境下计算pi值
  • 原文地址:https://www.cnblogs.com/lytwajue/p/7380496.html
Copyright © 2011-2022 走看看