zoukankan      html  css  js  c++  java
  • HBase部署与使用

    HBase部署与使用

    概述

    HBase的角色

    HMaster

    功能:

    • 监控RegionServer
    • 处理RegionServer故障转移
    • 处理元数据的变更
    • 处理region的分配或移除
    • 在空闲时间进行数据的负载均衡
    • 通过Zookeeper发布自己的位置给客户端

    RegionServer

    功能

    • 负责存储HBase的实际数据
    • 处理分配个他的Region
    • 刷新缓存到HDFS
    • 维护HLog
    • 执行压缩
    • 负责处理Region分片

    组件:

    • Write-Ahead logs

    HBase 的修改记录,当对 HBase 读写数据的时候,数据不是直接写进磁盘,它会在内存中
    保留一段时间(时间以及数据量阈值可以设定)。但把数据保存在内存中可能有更高的概率
    引起数据丢失,为了解决这个问题,数据会先写在一个叫做 Write-Ahead logfile 的文件中,
    然后再写入内存中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。

    • HFile

    这是在磁盘上保存原始数据的实际的物理文件,是实际的存储文件。

    • Store

    HFile 存储在 Store 中,一个 Store 对应 HBase 表中的一个列族。

    • MemStore

    顾名思义,就是内存存储,位于内存中,用来保存当前的数据操作,所以当数据保存在 WAL
    中之后,RegsionServer 会在内存中存储键值对

    • Region

    Hbase 表的分片,HBase 表会根据 RowKey 值被切分成不同的 region 存储在 RegionServer 中,
    在一个 RegionServer 中可以有多个不同的 region。

    HBase的架构

    HBase 一种是作为存储的分布式文件系统,另一种是作为数据处理模型的 MR 框架。因为日
    常开发人员比较熟练的是结构化的数据进行处理,但是在 HDFS 直接存储的文件往往不具
    有结构化,所以催生出了 HBase 在 HDFS 上的操作。如果需要查询数据,只需要通过键值
    便可以成功访问。

    架构图如下图所示:

    在这里插入图片描述

    HBase 内置有 Zookeeper,但一般我们会有其他的 Zookeeper 集群来监管 master 和
    regionserver,Zookeeper 通过选举,保证任何时候,集群中只有一个活跃的 HMaster,HMaster
    与 HRegionServer 启动时会向 ZooKeeper 注册,存储所有 HRegion 的寻址入口,实时监控
    HRegionserver 的上线和下线信息。并实时通知给 HMaster,存储 HBase 的 schema 和 table
    元数据,默认情况下,HBase 管理 ZooKeeper 实例,Zookeeper 的引入使得 HMaster 不再是
    单点故障。一般情况下会启动两个 HMaster,非 Active 的 HMaster 会定期的和 Active HMaster
    通信以获取其最新状态,从而保证它是实时更新的,因而如果启动了多个 HMaster 反而增加
    了 Active HMaster 的负担。
    一个 RegionServer 可以包含多个 HRegion,每个 RegionServer 维护一个 HLog,和多个 HFiles
    以及其对应的 MemStore。RegionServer 运行于 DataNode 上,数量可以与 DatNode 数量一致,

    架构图:
    在这里插入图片描述

    HBase部署与使用

    部署

    前提:
    zookeeper正常部署
    hadoop正常部署

    1.解压HBase程序包
    2.修改配置文件
    hbase-env.sh

    export JAVA_HOME=/home/admin/modules/jdk1.8.0_121
    export HBASE_MANAGES_ZK=false
    

    hbase-site.xml

    <configuration>
    	<property>
    		<name>hbase.rootdir</name>
    		<value>hdfs://linux01:8020/hbase</value>
    	</property>
    	<property>
    		<name>hbase.cluster.distributed</name>
    		<value>true</value>
    	</property>
    	<!-- 0.98 后的新变动,之前版本没有.port,默认端口为 60000 -->
    	<property>
    		<name>hbase.master.port</name>
    		<value>16000</value>
    	</property>
    	<property>
    		<name>hbase.zookeeper.quorum</name>
    		<value>linux01:2181,linux02:2181,linux03:2181</value>
    	</property>
    	<property>
    		<name>hbase.zookeeper.property.dataDir</name>
    		<value>/home/admin/modules/zookeeper-3.4.5/zkData</value>
    	</property>
    </configuration>
    

    regionservers:

    hadoop102
    hadoop103
    hadoop104
    

    3.jar包问题

    由于 HBase 需要依赖 Hadoop,所以替换 HBase 的 lib 目录下的 jar 包,以解决兼容问题:

    3.1 删除原有的 jar

    $ rm -rf /home/admin/modules/hbase-1.3.1/lib/hadoop-*
    $ rm -rf /home/admin/modules/hbase-1.3.1/lib/zookeeper-3.4.6.jar
    

    3.2 拷贝新 jar,涉及的 jar 有:

    hadoop-annotations-2.7.2.jar
    hadoop-auth-2.7.2.jar
    hadoop-client-2.7.2.jar
    hadoop-common-2.7.2.jar
    hadoop-hdfs-2.7.2.jar
    hadoop-mapreduce-client-app-2.7.2.jar
    hadoop-mapreduce-client-common-2.7.2.jar
    hadoop-mapreduce-client-core-2.7.2.jar
    hadoop-mapreduce-client-hs-2.7.2.jar
    hadoop-mapreduce-client-hs-plugins-2.7.2.jar
    hadoop-mapreduce-client-jobclient-2.7.2.jar
    hadoop-mapreduce-client-jobclient-2.7.2-tests.jar
    hadoop-mapreduce-client-shuffle-2.7.2.jar
    hadoop-yarn-api-2.7.2.jar
    hadoop-yarn-applications-distributedshell-2.7.2.jar
    hadoop-yarn-applications-unmanaged-am-launcher-2.7.2.jar
    hadoop-yarn-client-2.7.2.jar
    hadoop-yarn-common-2.7.2.jar
    hadoop-yarn-server-applicationhistoryservice-2.7.2.jar
    hadoop-yarn-server-common-2.7.2.jar
    hadoop-yarn-server-nodemanager-2.7.2.jar
    hadoop-yarn-server-resourcemanager-2.7.2.jar
    hadoop-yarn-server-tests-2.7.2.jar
    hadoop-yarn-server-web-proxy-2.7.2.jar
    zookeeper-3.4.5.jar
    

    这些 jar 包的对应版本应替换成你目前使用的 hadoop 版本,具体情况具体分析。

    查找 jar 包举例:

    $ find /home/admin/modules/hadoop-2.7.2/ -name hadoop-annotations*
    

    4.HBase 软连接 Hadoop 配置

    $ ln -s ~/modules/hadoop-2.7.2/etc/hadoop/core-site.xml
    ~/modules/hbase-1.3.1/conf/core-site.xml
    $ ln -s ~/modules/hadoop-2.7.2/etc/hadoop/hdfs-site.xml
    ~/modules/hbase-1.3.1/conf/hdfs-site.xml
    

    5.HBase 服务的启动
    5.1 启动方式 1

    $ bin/hbase-daemon.sh start master
    $ bin/hbase-daemon.sh start regionserver
    

    如果集群之间的节点时间不同步,会导致 regionserver 无法启动,抛出ClockOutOfSyncException 异常

    5.1 启动方式 2

    $ bin/start-hbase.sh
    

    对应的停止服务:

    $ bin/stop-hbase.sh
    
    如 果 使 用 的 是 JDK8 以 上 版 本 , 则 应 在 hbase-evn.sh 中 移 除
    “HBASE_MASTER_OPTS”和“HBASE_REGIONSERVER_OPTS”配置
    

    6.查看HBASE页面
    http://hadoop102:16010

    简单使用

    1.基本操作
    (1)进入HBase客户端命令行

    	$ bin/hbase shell	
    

    (2)查看帮助命令

    	hbase(main)> help
    

    (3)查看当前数据库中有哪些表

    	hbase(main)> list
    

    2.表的操作
    (1)创建表

    create 'student','info'
    

    (2)插入数据到表

    hbase(main) > put 'student','1001','info:name','Thomas'
    hbase(main) > put 'student','1001','info:sex','male'
    hbase(main) > put 'student','1001','info:age','18'
    hbase(main) > put 'student','1002','info:name','Janna'
    hbase(main) > put 'student','1002','info:sex','female'
    hbase(main) > put 'student','1002','info:age','20'
    

    (3)扫描查看数据

    hbase(main) > scan 'student'
    hbase(main) > scan 'student',{STARTROW => '1001', STOPROW => '1001'}
    hbase(main) > scan 'student',{STARTROW => '1001'}
    

    (4)查看表结构

    hbase(main):012:0> describe ‘student’
    

    (5)更新指定字段数据

    hbase(main) > put 'student','1001','info:name','Nick'
    hbase(main) > put 'student','1001','info:age','100'
    

    (6)查看“指定行”或“指定列族:列”的数据

    hbase(main) > get 'student','1001'
    hbase(main) > get 'student','1001','info:name'
    

    (7)删除数据

    删除某 某 rowkey 的 的 全部 数据
    hbase(main) > deleteall 'student','1001'
    
    删除某 rowkey  的某一列 数据
    hbase(main) > delete 'student','1002','info:sex'
    

    (8)清空表数据

    hbase(main) > truncate 'student'
    清空表的操作顺序为先 disable,然后再 truncating。
    
    

    (9)删除表

    hbase(main) > disable 'student'
    hbase(main) > drop 'student'
    

    (10)统计数据行数

    hbase(main) > count 'student'
    

    (11)变更表信息

    将 info 列族中的数据存放 3 个版本
    hbase(main) > alter 'student',{NAME=>'info',VERSIONS=>3}
    

    读写流程

    HBase读数据流程

    1) HRegionServer 保存着 meta 表以及表数据,要访问表数据,首先 Client 先去访问
    zookeeper,从 zookeeper 里面获取 meta 表所在的位置信息,即找到这个 meta 表在哪个
    HRegionServer 上保存着。
    2) 接着 Client 通过刚才获取到的 HRegionServer 的 IP 来访问 Meta 表所在的HRegionServer,从而读取到 Meta,进而获取到 Meta 表中存放的元数据。
    3) Client 通过元数据中存储的信息,访问对应的 HRegionServer,然后扫描所在
    HRegionServer 的 Memstore 和 Storefile 来查询数据。
    4) 最后 HRegionServer 把查询到的数据响应给 Client。
    

    HBase写数据流程

    1) Client 也是先访问 zookeeper,找到 Meta 表,并获取 Meta 表信息。
    2) 确定当前将要写入的数据所对应的 RegionServer 服务器和 Region。
    3) Client 向该 RegionServer 服务器发起写入数据请求,然后 RegionServer 收到请求并响应。
    4) Client 先把数据写入到 HLog,以防止数据丢失。
    5) 然后将数据写入到 Memstore。
    6) 如果 Hlog 和 Memstore 均写入成功,则这条数据写入成功。在此过程中,如果 Memstore达到阈值,
    会把 Memstore 中的数据 flush 到 StoreFile 中。
    7) 当 Storefile 越来越多,会触发 Compact 合并操作,把过多的 Storefile 合并成一个大的Storefile。
    当 Storefile 越来越大,Region 也会越来越大,达到阈值后,会触发 Split 操作,将 Region 一分为二。
    
    尖叫提示:因为内存空间是有限的,所以说溢写过程必定伴随着大量的小文件产生。
    

    JAVAAPI

    1. 新建Maven工程
      pom.xml
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.3.1</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.3.1</version>
            </dependency>
    
    1. 编写HBaseAPI
    package top.wintp.hbasetest;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    import javafx.scene.control.Tab;
    
    /**
     * @description: description:
     * <p>
     * @author: upuptop
     * <p>
     * @qq: 337081267
     * <p>
     * @CSDN: http://blog.csdn.net/pyfysf
     * <p>
     * @cnblogs: http://www.cnblogs.com/upuptop
     * <p>
     * @blog: http://wintp.top
     * <p>
     * @email: pyfysf@163.com
     * <p>
     * @time: 2019/04/2019/4/23
     * <p>
     */
    public class HBaseDemo {
        private static Logger logger = LoggerFactory.getLogger(HBaseDemo.class);
    
        private static Configuration sConf;
    
        static {
            //    加载配置
            sConf = HBaseConfiguration.create();
        }
    
    
        @Test
        public void testAllFun() throws Exception {
            //    判断表是否存在
            //System.out.println(tableExists("ns_ct:calllog"));
            //创建表
            //createTable("upuptop2", "info");
            //删除表
            dropTable("ns_ct:calllog");
    
            dropNamespace("ns_ct");
    
            //    插入数据
            //insertData("upuptop", "1004", "info", "name", "upuptop");
            //insertData("upuptop", "1003", "info", "name", "pyfysf");
            //insertData("upuptop", "1002", "info", "name", "wintp.top");
            //insertData("upuptop", "1001", "info", "name", "sfok.top");
    
            //    删除数据
            //deleteData("upuptop","1004");
    
            //    查询所有数据
            //getAllRow("ns_ct:calllog");
    
            //    查询单条数据
            //getRow("upuptop", "1001");
        }
    
        /**
         * 获取一行数据
         *
         * @param tableName 表名
         * @param rowKey
         * @throws IOException
         */
        private void getRow(String tableName, String rowKey) throws IOException {
            Connection connection = ConnectionFactory.createConnection(sConf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
            Cell[] cells = result.rawCells();
    
            for (Cell cell : cells) {
                byte[] rowArray = CellUtil.cloneRow(cell);
                logger.info("HBaseDemo  getAllRow()   rowArray   " + Bytes.toString(rowArray));
    
                byte[] familyArray = CellUtil.cloneFamily(cell);
                logger.info("HBaseDemo  getAllRow()   familyStr   " + Bytes.toString(familyArray));
    
                byte[] valueArray = CellUtil.cloneValue(cell);
                logger.info("HBaseDemo  getAllRow()   valueArray   " + Bytes.toString(valueArray));
    
            }
            table.close();
            connection.close();
        }
    
        /**
         * 获取所有的数据
         *
         * @param tableName 表名
         */
        private void getAllRow(String tableName) throws IOException {
            //    获取连接
            Connection connection = ConnectionFactory.createConnection(sConf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            //可以在这里给scan加过滤条件 类似于addQuery() addWhere()
            ResultScanner scanner = table.getScanner(scan);
    
            for (Result result : scanner) {
                Cell[] cells = result.rawCells();
    
                for (Cell cell : cells) {
                    byte[] rowArray = CellUtil.cloneRow(cell);
                    logger.info("HBaseDemo  getAllRow()   rowArray   " + Bytes.toString(rowArray));
    
                    byte[] familyArray = CellUtil.cloneFamily(cell);
                    logger.info("HBaseDemo  getAllRow()   familyStr   " + Bytes.toString(familyArray));
    
                    byte[] valueArray = CellUtil.cloneValue(cell);
                    logger.info("HBaseDemo  getAllRow()   valueArray   " + Bytes.toString(valueArray));
    
                }
            }
    
            table.close();
            connection.close();
    
        }
    
        /**
         * 删除数据
         *
         * @param tableName 表名
         * @param rowKey    rowkey
         * @throws IOException
         */
        private void deleteData(String tableName, String rowKey) throws IOException {
            Connection connection = ConnectionFactory.createConnection(sConf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
            table.close();
            connection.close();
        }
    
        /**
         * 插入数据
         *
         * @param tableName 表名
         * @param rowKey    行
         * @param family    列族
         * @param qualifier 具体的列名称
         * @param value     值
         * @throws IOException
         */
        private void insertData(String tableName, String rowKey, String family, String qualifier, String value) throws IOException {
            //    所有表的操作均为table对象
            Connection connection = ConnectionFactory.createConnection(sConf);
            //获取table对象
            Table table = connection.getTable(TableName.valueOf(tableName));
            //创建put对象 只要是添加一行的操作都是用put进行的
            Put put = new Put(Bytes.toBytes(rowKey));
    
            //列族 列名 值
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
    
            table.put(put);
    
    
            table.close();
            connection.close();
        }
    
        /**
         * 删除表
         *
         * @param tableName 表名称
         */
        private void dropTable(String tableName) throws IOException {
            //    获取连接
            Connection connection = ConnectionFactory.createConnection(sConf);
            Admin admin = connection.getAdmin();
    
            if (!tableExists(tableName)) {
                //    表不存在
                logger.info("HBaseDemo  dropTable()   数据表【{}】不存在   ", tableName);
                return;
            }
    
            //判断表是否禁用了
            if (!admin.isTableDisabled(TableName.valueOf(tableName))) {
                //   未禁用
                admin.disableTable(TableName.valueOf(tableName));
            }
    
            //删除表 必须要将表进行禁用 TableNotDisabledException
            admin.deleteTable(TableName.valueOf(tableName));
    
            admin.close();
            connection.close();
        }
    
        /**
         * 删除命名空间
         *
         * @param spaceName
         * @throws IOException
         */
        private void dropNamespace(String spaceName) throws IOException {
            //    获取连接
            Connection connection = ConnectionFactory.createConnection(sConf);
            Admin admin = connection.getAdmin();
    
            admin.deleteNamespace(spaceName);
    
            admin.close();
            connection.close();
        }
    
    
        /**
         * 创建表
         *
         * @param tableName  表名
         * @param familyName 列族名  可以有多个
         * @throws IOException
         */
        public void createTable(String tableName, String... familyName) throws IOException {
            //通过工厂类获取到连接对象
            Connection connection = ConnectionFactory.createConnection(sConf);
            //通过连接对象获取到操作表的admin对象
            Admin admin = connection.getAdmin();
            //创建表的描述对象
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
    
            for (String cf : familyName) {
                //创建列族对象
                HColumnDescriptor family = new HColumnDescriptor(cf);
                //为表的描述对象添加列族对象
                hTableDescriptor.addFamily(family);
            }
    
            //判断表是否存在
            if (tableExists(tableName)) {
                //    存在即不创建
                logger.info("HBaseDemo  createTable()   数据表已存在   ");
                return;
            }
            //创建表
            admin.createTable(hTableDescriptor);
            logger.info("HBaseDemo  createTable()   创建表【{}】成功   ", tableName);
    
            admin.close();
            connection.close();
        }
    
    
        /**
         * 判断表是否存在
         *
         * @param tableName
         */
        public boolean tableExists(String tableName) throws IOException {
            //获得连接
            Connection connection = ConnectionFactory.createConnection(sConf);
            //获取操作表的admin对象
            Admin admin = connection.getAdmin();
            //判断表是否存在
            boolean result = admin.tableExists(TableName.valueOf(tableName));
    
    
            admin.close();
            connection.close();
    
            return result;
        }
    
    
    }
    
    

    在这里插入图片描述

    MapReduce

    通过 HBase 的相关 JavaAPI,我们可以实现伴随 HBase 操作的 MapReduce 过程,比如使用
    MapReduce 将数据从本地文件系统导入到 HBase 的表中,比如我们从 HBase 中读取一些原
    始数据后使用 MapReduce 做数据分析。

    官方的HBASE-MapReduce执行

    $ bin/hbase mapredcp
    
    1. 执行环境变量的导入
    $ export HBASE_HOME=/home/admin/modules/hbase-1.3.1
    $ export HADOOP_HOME=/home/admin/modules/hadoop-2.7.2
    $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
    
    1. 运行官方的 MapReduce 任务

    – 案例一:统计 Student

    $ ~/modules/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
    

    – 案例二:用 使用 MapReduce 将本地数据导入到 HBase

    (1) 在本地创建一个 tsv 格式的文件:fruit.tsv

    1001  Apple  Red
    1002  Pear Yellow
    1003  Pineapple Yellow
    

    (2) 创建 HBase 表

    hbase(main):001:0> create 'fruit','info'
    

    (3) 在 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件

    $ ~/modules/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
    $ ~/modules/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
    

    (4) 执行 MapReduce 到 到 HBase 的 的 fruit

    $ ~/modules/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv 
    -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit 
    hdfs://linux01:8020/input_fruit
    

    (5)使用 scan 命令查看导入后的结果

    hbase(main):001:0> scan ‘fruit’
    

    自定义 HBase-MapReduce1

    目标:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中

    1. 构建 ReadFruitMapper 类,用于读取 fruit 表中的数据
    
    public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context)
                throws IOException, InterruptedException {
    //将 fruit 的 name 和 color 提取出来,相当于将每一行数据读取出来放入到 Put 对象中。
            Put put = new Put(key.get());
    //遍历添加 column 行
            for (Cell cell : value.rawCells()) {
    //添加/克隆列族:info
                if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
    //添加/克隆列:name
                    if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
    //将该列 cell 加入到 put 对象中
                        put.add(cell);
    //添加/克隆列:color
                    } else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
    //向该列 cell 加入到 put 对象中
                        put.add(cell);
                    }
                }
            }//将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出
            context.write(key, put);
        }
    }
    
    1. 构建 WriteFruitMRReducer 类,用于将读取到的 fruit 表中的数据写入到 fruit_mr 表中
    
    public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
                throws IOException, InterruptedException {
    //读出来的每一行数据写入到 fruit_mr 表中
            for (Put put : values) {
                context.write(NullWritable.get(), put);
            }
        }
    }
    
    1. 构建 Fruit2FruitMRRunner extends Configured implements Tool 用于组装运行 Job 任务
    //组装 Job
        public int run(String[] args) throws Exception {
            //得到 Configuration
            Configuration conf = this.getConf();
            //创建 Job 任务
            Job job = Job.getInstance(conf, this.getClass().getSimpleName());
            job.setJarByClass(Fruit2FruitMRRunner.class);
            //配置 Job
            Scan scan = new Scan();
            scan.setCacheBlocks(false);
            scan.setCaching(500);
            //设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者是老 版本
            TableMapReduceUtil.initTableMapperJob(
                    "fruit", //数据源的表名
                    scan, //scan 扫描控制器
                    ReadFruitMapper.class,//设置 Mapper 类
                    ImmutableBytesWritable.class,//设置 Mapper 输出 key 类型
                    Put.class,//设置 Mapper 输出 value 值类型
                    job//设置给哪个 JOB
            );
            //设置 Reducer
            TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class,
                    job);
            //设置 Reduce 数量,最少 1 个
            job.setNumReduceTasks(1);
            boolean isSuccess = job.waitForCompletion(true);
            if (!isSuccess) {
                throw new IOException("Job running with error");
            }
            return isSuccess ? 0 : 1;
        }
        
    	public static void main( String[] args ) throws Exception{
    		Configuration conf = HBaseConfiguration.create();
    		int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
    		System.exit(status);
    	}
    
    1. 打包运行
    $ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar
    com.z.hbase.mr1.Fruit2FruitMRRunner
    

    自定义 HBase-MapReduce2

    目标:实现将 HDFS 中的数据写入到 HBase 表中

    1. 构建 ReadFruitFromHDFSMapper 于读取 HDFS 中的文件数据
    public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text,
            ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException,
                InterruptedException {
    //从 HDFS 中读取的数据
            String lineValue = value.toString();
    //读取出来的每行数据使用	 进行分割,存于 String 数组
            String[] values = lineValue.split("	");
    //根据数据中值的含义取值
            String rowKey = values[0];
            String name = values[1];
            String color = values[2];
    //初始化 rowKey
            ImmutableBytesWritable rowKeyWritable = new
                    ImmutableBytesWritable(Bytes.toBytes(rowKey));
    //初始化 put 对象
            Put put = new Put(Bytes.toBytes(rowKey));
    //参数分别:列族、列、值
            put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
            put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
            context.write(rowKeyWritable, put);
        }
    }
    
    1. 构建 WriteFruitMRFromTxtReducer
    
    public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put,
            NullWritable> {
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
                throws IOException, InterruptedException {
    //读出来的每一行数据写入到 fruit_hdfs 表中
            for(Put put: values){
                context.write(NullWritable.get(), put);
            }
        }
    }
    
    1. 创建 Txt2FruitRunner 组装 Job
     public int run(String[] args) throws Exception {
    //得到 Configuration
            Configuration conf = this.getConf();
    //创建 Job 任务
            Job job = Job.getInstance(conf, this.getClass().getSimpleName());
            job.setJarByClass(Txt2FruitRunner.class);
            Path inPath = new Path("hdfs://linux01:8020/input_fruit/fruit.tsv");
            FileInputFormat.addInputPath(job, inPath);
    //设置 Mapper
            job.setMapperClass(ReadFruitFromHDFSMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
    //设置 Reducer
            TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);
    //设置 Reduce 数量,最少 1 个
            job.setNumReduceTasks(1);
            boolean isSuccess = job.waitForCompletion(true);
            if(!isSuccess){
                throw new IOException("Job running with error");
            }
            return isSuccess ? 0 : 1;
        }
    

    与hive的集成

    环境准备

    因为我们后续可能会在操作 Hive 的同时对 HBase 也会产生影响,所以 Hive 需要持有操作
    HBase 的 Jar,那么接下来拷贝 Hive 所依赖的 Jar 包(或者使用软连接的形式)

    $ export HBASE_HOME=/home/admin/modules/hbase-1.3.1
    $ export HIVE_HOME=/home/admin/modules/apache-hive-1.2.2-bin
    $ ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar
    $HIVE_HOME/lib/hbase-common-1.3.1.jar
    $ ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
    $ ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
    $ ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
    $ ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
    $ ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar
    $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
    $ ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
    $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
    $ ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
    $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
    

    同时在 hive-site.xml 中修改 zookeeper 的属性,如下:

    <property>
    	<name>hive.zookeeper.quorum</name>
    	<value>linux01,linux02,linux03</value>
    	<description>The list of ZooKeeper servers to talk to. This is only needed for read/write
    	locks.</description>
    </property>
    <property>
    	<name>hive.zookeeper.client.port</name>
    	<value>2181</value>
    	<description>The port of ZooKeeper servers to talk to. This is only needed for read/write
    	locks.</description>
    </property>
    
    1. 案例一 一
      目标:建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表。
      分步实现:

    (1) 在 在 Hive 中创建表同时关联 HBase

    CREATE TABLE hive_hbase_emp_table(
    empno int,
    ename string,
    job string,
    mgr int,
    hiredate string,
    sal double,
    comm double,
    deptno int)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" =
    ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
    TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
    
    完成之后,可以分别进入 Hive 和 HBase 查看,都生成了对应的表
    

    (2) 在 在 Hive 中创建临时中间表,用于 load 文件中的数据

    不能将数据直接 load 进 Hive 所关联 HBase 的那张表中

    CREATE TABLE emp(
    empno int,
    ename string,
    job string,
    mgr int,
    hiredate string,
    sal double,
    comm double,
    deptno int)
    row format delimited fields terminated by '	';
    

    (3) 向 向 Hive 中间表中 load 数据

    hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;
    

    (4) 通过 insert 命令将中间表中的数据导入到 Hive 关联 HBase

    hive> insert into table hive_hbase_emp_table select * from emp;
    

    (5) 查看 Hive 以及关联的 HBase
    Hive

    hive> select * from hive_hbase_emp_table;
    

    HBase :

    hbase> scan ‘hbase_emp_table’
    
    1. 案例二

    目标:在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来
    关联 HBase 中的 hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数
    据。
    注:该案例 2 紧跟案例 1 的脚步,所以完成此案例前,请先完成案例 1。

    (1) 在 在 Hive 中创建外部表

    CREATE EXTERNAL TABLE relevance_hbase_emp(
    empno int,
    ename string,
    job string,
    mgr int,
    hiredate string,
    sal double,
    comm double,
    deptno int)
    STORED BY
    'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" =
    ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
    TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
    

    关联后就可以使用 Hive 函数进行一些分析操作了

    hive (default)> select * from relevance_hbase_emp;
    

    与sqoop的集成

    相关参数

    参数 描述
    –column-family <family> Sets the target column family for the import设置导入的目标列族。
    –hbase-create-table If specified, create missing HBase tables是否自动创建不存在的 HBase 表(这就意味着,不需要手动提前在 HBase 中先建立表)
    –hbase-row-key <col> Specifies which input column to use as the rowkey.In case, if input table contains compositekey, then <col> must be in the form of a comma-separated list of composite key attributes. mysql 中哪一列的值作为 HBase 的 rowkey, 如果rowkey是个组合键,则以逗号分隔。 (注:避免 rowkey 的重复)
    –hbase-table <table-name> Specifies an HBase table to use as the targetinstead of HDFS.指定数据将要导入到 HBase 中的哪张表中。
    –hbase-bulkload Enables bulk loading.是否允许 bulk 形式的导入。

    将 RDBMS 中的数据抽取到 HBase 中

    (1) 配置 sqoop-env.sh

    export HBASE_HOME=/home/admin/modules/hbase-1.3.1
    
    

    (2) 在 在 Mysql 中新建一个数据库 db_library ,一张表 book

    CREATE DATABASE db_library;
    CREATE TABLE db_library.book(
    id int(4) PRIMARY KEY NOT NULL AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    price VARCHAR(255) NOT NULL);
    

    (3) 插入数据

    INSERT INTO db_library.book (name, price) VALUES('Lie Sporting', '30');
    INSERT INTO db_library.book (name, price) VALUES('Pride & Prejudice', '70');
    INSERT INTO db_library.book (name, price) VALUES('Fall of Giants', '50');
    

    (4) 执行 Sqoop

    $ bin/sqoop import 
    --connect jdbc:mysql://linux01:3306/db_library 
    --username root 
    --password 123456 
    --table book 
    --columns "id,name,price" 
    --column-family "info" 
    --hbase-create-table 
    --hbase-row-key "id" 
    --hbase-table "hbase_book" 
    --num-mappers 1 
    --split-by id
    

    sqoop1.4.6 只支持 HBase1.0.1 之前的版本的自动创建 HBase 表的功能

    (5)创建HBASE表

    hbase> create 'hbase_book','info'
    

    (5) 在 在 HBase 中 中 scan

    hbase> scan ‘hbase_book’
    

    常用的shell操作

    1. satus 显示服务器状态
    hbase> status ‘linux01’
    
    1. whoami 显示 HBase 当前用户
        hbase> whoami
    
    1. list 显示当前所有的表
        hbase> list
    
    1. count统计指定表的记录数
        hbase> count 'hbase_book'
    
    1. describe 展示表结构信息
        hbase> describe 'hbase_book'
    
    1. exist 检查表是否存在,适用于表量特别多的情况
        hbase> exist 'hbase_book'
    
    1. is_enabled/is_disabled 检查表是否启用或禁用
        hbase> is_enabled 'hbase_book'
        hbase> is_disabled 'hbase_book'
    
    1. alter 该命令可以改变表和列族的模式
        为当前表增加列族:
        hbase> alter 'hbase_book', NAME => 'CF2', VERSIONS => 2
        为当前表删除列族:
        hbase> alter 'hbase_book', 'delete' => ’CF2’
    
    1. disable 禁用一张表
    hbase> disable 'hbase_book'
    
    1. drop 删除一张表,记得在删除表之前必须先禁用
    hbase> drop 'hbase_book'
    
    1. delete 删除一行中一个单元格的值
    hbase> delete ‘hbase_book’, ‘rowKey’, ‘CF:C’
    
    1. truncate 清空表数据,即禁用表-删除表-创建表
    hbase> truncate 'hbase_book'
    
    1. create 创建表
    hbase> create ‘table’, ‘cf’
    

    创建多个列族:

    hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
    

    数据的备份与操作

    备份

    停止 HBase 服务后,使用 distcp 命令运行 MapReduce 任务进行备份,将数据备份到另一个
    地方,可以是同一个集群,也可以是专用的备份集群。
    即,把数据转移到当前集群的其他目录下(也可以不在同一个集群中):

    $ bin/hadoop distcp 
    hdfs://linux01:8020/hbase 
    hdfs://linux01:8020/HbaseBackup/backup20171009
    

    执行该操作,一定要开启 Yarn 服务

    恢复

    $ bin/hadoop distcp 
    hdfs://linux01:8020/HbaseBackup/backup20170930 
    hdfs://linux01:8020/hbase
    

    节点的管理

    服役( (commissioning )

    当启动 regionserver 时,regionserver 会向 HMaster 注册并开始接收本地数据,开始的时候,
    新加入的节点不会有任何数据,平衡器开启的情况下,将会有新的 region 移动到开启的
    RegionServer 上。如果启动和停止进程是使用 ssh 和 HBase 脚本,那么会将新添加的节点的
    主机名加入到 conf/regionservers 文件中。

    退役( (decommissioning )

    顾名思义,就是从当前 HBase 集群中删除某个 RegionServer,这个过程分为如下几个过程:

    1. 停止负载平衡器
    hbase> balance_switch false
    
    1. 在退役止 节点上停止 RegionServer
    hbase> hbase-daemon.sh stop regionserver
    
    1. RegionServer 一旦停止,会关闭维护的所有 region
    2. Zookeeper 上的该 RegionServer 节点消失
    3. Master 节点检测到该 RegionServer 下线
    4. RegionServer 的 的 region 服务得到重新分配
      该关闭方法比较传统,需要花费一定的时间,而且会造成部分 region 短暂的不可用。

    另一种方案:

    1. RegionServer 先卸载所管理的 region
    $ bin/graceful_stop.sh <RegionServer-hostname>
    
    1. 自动平衡数据
    2. 和之前的 2~6

    版本的确界

    1. 版本的下界
      默认的版本下界是 0,即禁用。row 版本使用的最小数目是与生存时间(TTL Time To Live)
      相结合的,并且我们根据实际需求可以有 0 或更多的版本,使用 0,即只有 1 个版本的值写
      入 cell。
    2. 版本的上界
      之前默认的版本上界是 3,也就是一个 row 保留 3 个副本(基于时间戳的插入)。该值不要
      设计的过大,一般的业务不会超过 100。如果 cell 中存储的数据版本号超过了 3 个,再次插
      入数据时,最新的值会将最老的值覆盖。(现版本已默认为 1)

    本博客仅为博主学习总结,感谢各大网络平台的资料。蟹蟹!!

  • 相关阅读:
    【BZOJ】2209: [Jsoi2011]括号序列(splay)
    【BZOJ】1251: 序列终结者(splay)
    【UOJ】【UR #2】猪猪侠再战括号序列(splay/贪心)
    【BZOJ】3781: 小B的询问(莫队算法)
    【BZOJ】3289: Mato的文件管理(莫队算法+树状数组)
    【BZOJ】2038: [2009国家集训队]小Z的袜子(hose)(组合计数+概率+莫队算法+分块)
    【SPOJ】7258. Lexicographical Substring Search(后缀自动机)
    vue 中 event.stopPropagation() 和event.preventDefault() 使用
    Vue 中 使用v-show
    Vue v-if以及 v-else 的使用
  • 原文地址:https://www.cnblogs.com/shaofeer/p/11154282.html
Copyright © 2011-2022 走看看