zoukankan      html  css  js  c++  java
  • hbase ---有用

    目录

    1. hbase简介 1

    1.1. 什么是hbase 1

    1.2. 与传统数据库的对比 1

    1.3. hbase集群中的角色 1

    2. habse安装 2

    2.1. hbase安装 2

    2.1.1. 上传 2

    2.1.2. 解压 2

    2.1.3. 重命名 2

    2.1.4. 修改环境变量(每台机器都要执行) 2

    2.1.5. 修改配置文件 2

    2.1.6. 分发到其他节点 2

    2.1.7. 启动 3

    2.1.8. 监控 3

    3. hbase数据模型 3

    3.1. hbase数据模型 3

    3.1.1. Row Key 3

    3.1.2. Columns Family 4

    3.1.3. Cell 4

    3.1.4. Time Stamp 4

    4. hbase命令 4

    4.1. 命令的进退 4

    4.2. 命令 5

    5. hbase依赖zookeeper 6

    6. hbase开发 6

    6.1. 配置 6

    6.2. 表管理类 7

    6.3. 表描述类 7

    6.4. 列族的描述类 7

    6.5. 创建表的操作 7

    6.6. 删除表 8

    6.7. 创建一个表的类 8

    6.8. 单条插入数据 8

    6.9. 批量插入 9

    6.10. 删除数据 9

    6.11. 单条查询 9

    6.12. 批量查询 10

    6.13. hbase过滤器 10

    6.13.1. FilterList 10

    6.13.2. 过滤器的种类 11

    6.13.3. 列植过滤器—SingleColumnValueFilter 11

    6.13.4. 列名前缀过滤器—ColumnPrefixFilter 12

    6.13.5. 多个列值前缀过滤器—MultipleColumnPrefixFilter 12

    6.13.6. rowKey过滤器—RowFilter 12

    7. hbase原理 12

    7.1. 体系图 12

    7.1.1. 写流程 13

    7.1.2. 数据flush过程 13

    7.1.3. hbase的读流程 13

    7.1.4. hmaster的职责 13

    7.1.5. hregionserver的职责 13

    7.1.6. client职责 14

    8. 总结 14

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    hbase

    1. hbase简介

    1.1. 什么是hbase

    HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。

    HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

    HBASE是Google Bigtable的开源实现,但是也有很多不同之处。比如:Google Bigtable利用GFS作为其文件存储系统,HBASE利用Hadoop HDFS作为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE同样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby作为协同服务,HBASE利用Zookeeper作为对应。

    1.2. 与传统数据库的对比

    1、传统数据库遇到的问题:

    1)数据量很大的时候无法存储

    2)没有很好的备份机制

    3)数据达到一定数量开始缓慢,很大的话基本无法支撑

     2、HBASE优势:

    1)线性扩展,随着数据量增多可以通过节点扩展进行支撑

    2)数据存储在hdfs上,备份机制健全

    3)通过zookeeper协调查找数据,访问速度块。

    1.3. hbase集群中的角色

    1、一个或者多个主节点,Hmaster

    2、多个从节点,HregionServer

    2. habse安装

    2.1. hbase安装a

    2.1.1. 上传

    用工具上传

    2.1.2. 解压

    su – hadoop

    tar -zxvf hbase-0.94.6.tar.gz

    2.1.3. 重命名

    mv hbase-0.94.6 hbase

    2.1.4. 修改环境变量(每台机器都要执行)

    su – root

    vi /etc/profile

    添加内容:

    export HBASE_HOME=/home/hadoop/hbase

    export PATH=$PATH:$HBASE_HOME/bin

    source /etc/proflie

    su - hadoop

    2.1.5. 修改配置文件

    上传配置文件

    2.1.6. 分发到其他节点

    scp -r /home/hadoop/hbase hadoop@slave1:/home/hadoop/

    scp -r /home/hadoop/hbase hadoop@slave2:/home/hadoop/

    scp -r /home/hadoop/hbase hadoop@slave3:/home/hadoop/

    2.1.7. 启动

    注意:启动hbase之前,必须保证hadoop集群和zookeeper集群是可用的。

    start-hbase.sh

    2.1.8. 监控

    1、 进入命令行

    hbase shell

    2、 页面监控

    http://master:60010/

    3. hbase数据模型

    3.1. hbase数据模型

     

    3.1.1. Row Key  

    与nosql数据库们一样,row key是用来检索记录的主键。访问HBASE table中的行,只有三种方式:

    1.通过单个row key访问

    2.通过row key的range(正则)

    3.全表扫描

    Row key行键 (Row key)可以是任意字符串(最大长度 是 64KB,实际应用中长度一般为 10-100bytes),在HBASE内部,row key保存为字节数组。存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)

    3.1.2. Columns Family

    列簇HBASE表中的每个列,都归属于某个列族列族是表的schema的一部 分(而列不是),必须在使用表之前定义。列名都以列族作为前缀。例如 courses:history,courses:math都属于courses 这个列族。

    3.1.3. Cell

    由{row key, columnFamily, version} 唯一确定的单元。cell中 的数据是没有类型的,全部是字节码形式存贮

    关键字:无类型、字节码

    3.1.4. Time Stamp

    HBASE 中通过rowkey和columns确定的为一个存贮单元称为cell。每个 cell都保存 着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由HBASE(在数据写入时自动 )赋值,此时时间戳是精确到毫秒 的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版 本冲突,就必须自己生成具有唯一性的时间戳。每个 cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。

    为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,HBASE提供 了两种数据版本回收方式。一是保存数据的最后n个版本,二是保存最近一段 时间内的版本(比如最近七天)。用户可以针对每个列族进行设置。

    4. hbase命令

    4.1. 命令的进退

    1、hbase提供了一个shell的终端给用户交互

    #$HBASE_HOME/bin/hbase shell 

    2、如果退出执行quit命令

    #$HBASE_HOME/bin/hbase shell

    …… 

    >quit

    4.2. 命令

    名称

    命令表达式

    创建表

    create '表名', '列族名1','列族名2','列族名N'

    查看所有表

    list

    描述表

    describe  ‘表名’

    判断表存在

    exists  '表名'

    判断是否禁用启用表

    is_enabled '表名'

    is_disabled ‘表名’

    添加记录      

    put  ‘表名’, ‘rowKey’, ‘列族 : 列‘  ,  '值'

    查看记录rowkey下的所有数据

    get  '表名' , 'rowKey'

    查看表中的记录总数

    count  '表名'

    获取某个列族

    get '表名','rowkey','列族'

    获取某个列族的某个列

    get '表名','rowkey','列族:列’

    删除记录

    delete  ‘表名’ ,‘行名’ , ‘列族:列'

    删除整行

    deleteall '表名','rowkey'

    删除一张表

    先要屏蔽该表,才能对该表进行删除

    第一步 disable ‘表名’ ,第二步  drop '表名'

    清空表

    truncate '表名'

    查看所有记录

    scan "表名"  

    查看某个表某个列中所有数据

    scan "表名" , {COLUMNS=>'列族名:列名'}

    更新记录

    就是重写一遍,进行覆盖,hbase没有修改,都是追加

     

    5. hbase依赖zookeeper

    1、 保存Hmaster的地址和backup-master地址

    hmaster:

    a) 管理HregionServer

    b) 做增删改查表的节点

    c) 管理HregionServer中的表分配

    2、 保存表-ROOT-的地址

    hbase默认的根表,检索表。

    3、 HRegionServer列表

    表的增删改查数据。

    和hdfs交互,存取数据

    6. hbase开发

    6.1. 配置

    HBaseConfiguration

    包:org.apache.hadoop.hbase.HBaseConfiguration

    作用:通过此类可以对HBase进行配置

    用法实例:

    Configuration config = HBaseConfiguration.create();

    说明: HBaseConfiguration.create() 默认会从classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration。

     

    使用方法:

    static Configuration config = null;

    static {

         config = HBaseConfiguration.create();

         config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

         config.set("hbase.zookeeper.property.clientPort", "2181");

    }

    6.2. 表管理类

    HBaseAdmin

    包:org.apache.hadoop.hbase.client.HBaseAdmin

    作用:提供接口关系HBase 数据库中的表信息

     

    用法:

    HBaseAdmin admin = new HBaseAdmin(config);

    6.3. 表描述类

    HTableDescriptor

    包:org.apache.hadoop.hbase.HTableDescriptor

    作用:HTableDescriptor 类包含了表的名字以及表的列族信息

              表的schema(设计)

    用法:

    HTableDescriptor htd =new HTableDescriptor(tablename);

    htd.addFamily(new HColumnDescriptor(“myFamily”));

    6.4. 列族的描述类

    HColumnDescriptor

    包:org.apache.hadoop.hbase.HColumnDescriptor

    作用:HColumnDescriptor 维护列族的信息

     

    用法:

    htd.addFamily(new HColumnDescriptor(“myFamily”));

    6.5. 创建表的操作

    CreateTable(一般我们用shell创建表)

    static Configuration config = null;

    static {

         config = HBaseConfiguration.create();

         config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

         config.set("hbase.zookeeper.property.clientPort", "2181");

    }

     

    HBaseAdmin admin = new HBaseAdmin(config);

    HTableDescriptor desc = new HTableDescriptor(tableName);

    HColumnDescriptor family1 = new HColumnDescriptor(“f1”);

    HColumnDescriptor family2 = new HColumnDescriptor(“f2”);

    desc.addFamily(family1);

    desc.addFamily(family2);

    admin.createTable(desc);

    6.6. 删除表

    HBaseAdmin admin = new HBaseAdmin(config);

    admin.disableTable(tableName);

    admin.deleteTable(tableName);

    6.7. 创建一个表的类

    HTable

    包:org.apache.hadoop.hbase.client.HTable

    作用:HTable 和 HBase 的表通信

    用法:

    // 普通获取表

     HTable table = new HTable(config,Bytes.toBytes(tablename);

    // 通过连接池获取表

    Connection connection = ConnectionFactory.createConnection(config);

    HTableInterface table = connection.getTable(TableName.valueOf("user"));

    6.8. 单条插入数据

    Put

    包:org.apache.hadoop.hbase.client.Put

    作用:插入数据

    用法:

    Put put = new Put(row);

    p.add(family,qualifier,value);

    说明:向表 tablename 添加 “family,qualifier,value”指定的值。

     

    示例代码:

    Connection connection = ConnectionFactory.createConnection(config);

    HTableInterface table = connection.getTable(TableName.valueOf("user"));

    Put put = new Put(Bytes.toBytes(rowKey));

    put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),Bytes.toBytes(value));

    table.put(put);

    6.9. 批量插入

    批量插入

    List<Put> list = new ArrayList<Put>();

    Put put = new Put(Bytes.toBytes(rowKey));//获取put,用于插入

    put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),Bytes.toBytes(value));//封装信息

    list.add(put);

    table.put(list);//添加记录

    6.10. 删除数据

    Delete

    包:org.apache.hadoop.hbase.client.Delete

    作用:删除给定rowkey的数据

    用法:

    Delete del= new Delete(Bytes.toBytes(rowKey));

    table.delete(del);

    代码实例

    Connection connection = ConnectionFactory.createConnection(config);

    HTableInterface table = connection.getTable(TableName.valueOf("user"));

    Delete del= new Delete(Bytes.toBytes(rowKey));

    table.delete(del);

    6.11. 单条查询

    Get

    包:org.apache.hadoop.hbase.client.Get

    作用:获取单个行的数据

    用法:

    HTable table = new HTable(config,Bytes.toBytes(tablename));

    Get get = new Get(Bytes.toBytes(row));

    Result result = table.get(get);

    说明:获取 tablename 表中 row 行的对应数据

     

    代码示例:

    Connection connection = ConnectionFactory.createConnection(config);

    HTableInterface table = connection.getTable(TableName.valueOf("user"));

    Get get = new Get(rowKey.getBytes());

    Result row = table.get(get);

    for (KeyValue kv : row.raw()) {

    System.out.print(new String(kv.getRow()) + " ");

    System.out.print(new String(kv.getFamily()) + ":");

    System.out.print(new String(kv.getQualifier()) + " = ");

    System.out.print(new String(kv.getValue()));

    System.out.print(" timestamp = " + kv.getTimestamp() + " ");

    }

    6.12. 批量查询

    ResultScanner

    包:org.apache.hadoop.hbase.client.ResultScanner

    作用:获取值的接口

    用法:

    ResultScanner scanner = table.getScanner(scan);

    For(Result rowResult : scanner){

            Bytes[] str = rowResult.getValue(family,column);

    }

    说明:循环获取行中列值。

     

    代码示例:

    Connection connection = ConnectionFactory.createConnection(config);

    HTableInterface table = connection.getTable(TableName.valueOf("user"));

    Scan scan = new Scan();

    scan.setStartRow("a1".getBytes());

    scan.setStopRow("a20".getBytes());

    ResultScanner scanner = table.getScanner(scan);

    for (Result row : scanner) {

    System.out.println(" Rowkey: " + new String(row.getRow()));

    for (KeyValue kv : row.raw()) {

         System.out.print(new String(kv.getRow()) + " ");

         System.out.print(new String(kv.getFamily()) + ":");

         System.out.print(new String(kv.getQualifier()) + " = ");

         System.out.print(new String(kv.getValue()));

         System.out.print(" timestamp = " + kv.getTimestamp() + " ");

    }

    }

    6.13. hbase过滤器

    6.13.1. FilterList

    FilterList 代表一个过滤器列表,可以添加多个过滤器进行查询,多个过滤器之间的关系有:

    与关系(符合所有):FilterList.Operator.MUST_PASS_ALL  

    或关系(符合任一):FilterList.Operator.MUST_PASS_ONE

     

    使用方法:

    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);   

    Scan s1 = new Scan();  

     filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(“f1”),  Bytes.toBytes(“c1”),  CompareOp.EQUAL,Bytes.toBytes(“v1”) )  );  

    filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(“f1”),  Bytes.toBytes(“c2”),  CompareOp.EQUAL,Bytes.toBytes(“v2”) )  );  

     // 添加下面这一行后,则只返回指定的cell,同一行中的其他cell不返回  

     s1.addColumn(Bytes.toBytes(“f1”), Bytes.toBytes(“c1”));  

     s1.setFilter(filterList);  //设置filter

     ResultScanner ResultScannerFilterList = table.getScanner(s1);  //返回结果列表

    6.13.2. 过滤器的种类

    过滤器的种类

    列值过滤器—SingleColumnValueFilter

          过滤列植的相等、不等、范围等

    列名前缀过滤器—ColumnPrefixFilter

          过滤指定前缀的列名

    多个列名前缀过滤器—MultipleColumnPrefixFilter

           过滤多个指定前缀的列名

    rowKey过滤器—RowFilter

          通过正则,过滤rowKey值。

    6.13.3. 列植过滤器—SingleColumnValueFilter

    SingleColumnValueFilter 列值判断

    相等 (CompareOp.EQUAL ),

    不等(CompareOp.NOT_EQUAL),

    范围 (e.g., CompareOp.GREATER)…………

    下面示例检查列值和字符串'values' 相等...

    SingleColumnValueFilter f = new  SingleColumnValueFilter(

    Bytes.toBytes("cFamily")              Bytes.toBytes("column"), CompareFilter.CompareOp.EQUAL,

            Bytes.toBytes("values"));

    s1.setFilter(f);

    注意:如果过滤器过滤的列在数据表中有的行中不存在,那么这个过滤器对此行无法过滤。

    6.13.4. 列名前缀过滤器—ColumnPrefixFilter

    过滤器—ColumnPrefixFilter

    ColumnPrefixFilter 用于指定列名前缀值相等

    ColumnPrefixFilter f = new ColumnPrefixFilter(Bytes.toBytes("values"));

    s1.setFilter(f);

    6.13.5. 多个列值前缀过滤器—MultipleColumnPrefixFilter

    MultipleColumnPrefixFilter 和 ColumnPrefixFilter 行为差不多,但可以指定多个前缀

    byte[][] prefixes = new byte[][] {Bytes.toBytes("value1"),Bytes.toBytes("value2")};

    Filter f = new MultipleColumnPrefixFilter(prefixes);

    s1.setFilter(f);

    6.13.6. rowKey过滤器—RowFilter

    RowFilter 是rowkey过滤器

    通常根据rowkey来指定范围时,使用scan扫描器的StartRow和StopRow方法比较好。

    Filter f = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^1234")); //匹配以1234开头的rowkey

    s1.setFilter(f);

    7. hbase原理

    7.1. 体系图

     

    7.1.1. 写流程

    1、 client向hregionserver发送写请求

    2、 hregionserver将数据写到hlog(write ahead log)。为了数据的持久化和恢复。

    3、 hregionserver将数据写到内存(memstore)

    4、 反馈client写成功。

    7.1.2. 数据flush过程

    1、 当memstore数据达到阈值(默认是64M)将数据刷到硬盘,将内存中的数据删除,同时删除Hlog中的历史数据。

    2、 并将数据存储到hdfs中。

    3、 在hlog中做标记点。

    7.1.3. 数据合并过程

    1、 当数据块达到4块,hmaster将数据块加载到本地,进行合并

    2、 当合并的数据超过256M进行拆分,将拆分后的region分配给不同的hregionserver管理

    3、 当hregionser宕机后将hregionserver上的hlog拆分然后分配给不同的hregionserver加载,修改.META.

    4、 注意:hlog会同步到hdfs

    7.1.4. hbase的读流程

    1、 通过zookeeper和-ROOT- .META.表定位hregionserver。

    2、 数据从内存和硬盘合并后返回给client

    3、 数据块会缓存

    7.1.5. hmaster的职责 

    1、管理用户对Table的增、删、改、查操作

    2、记录region在哪台Hregion server上

    3、在Region Split后,负责新Region的分配;

    4、新机器加入时,管理HRegion Server的负载均衡,调整Region分布

    5、在HRegion Server宕机后,负责失效HRegion Server 上的Regions迁移。

    7.1.6. hregionserver的职责

    HRegion Server主要负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBASE中最核心的模块。

    HRegion Server管理了很多table的分区,也就是region。

    7.1.7. client职责

    Client

    HBASE Client使用HBASE的RPC机制与HMaster和RegionServer进行通信

    管理类操作:Client与HMaster进行RPC;

    数据读写类操作:Client与HRegionServer进行RPC。

    8. MapReduce操作Hbase

    8.1. 实现方法

    Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。

    1、写个mapper继承TableMapper<Text, IntWritable>

    参数:Text:mapper的输出key类型; IntWritable:mapper的输出value类型。

          其中的map方法如下: 

    map(ImmutableBytesWritable key, Result value,Context context)

     参数:key:rowKey;value: Result ,一行数据; context上下文

    2、写个reduce继承TableReducer<Text, IntWritable, ImmutableBytesWritable>

    参数:Text:reducer的输入key; IntWritable:reduce的输入value;

     ImmutableBytesWritable:reduce输出到hbase中的rowKey类型。

          其中的reduce方法如下:

    reduce(Text key, Iterable<IntWritable> values,Context context)

    参数: key:reduce的输入key;values:reduce的输入value;

     

    8.2. 准备表

    1、建立数据来源表‘word’,包含一个列族‘content’

    向表中添加数据,在列族中放入列‘info’,并将短文数据放入该列中,如此插入多行,行键为不同的数据即可

     

    2、建立输出表‘stat’,包含一个列族‘content’

     

    3、通过Mr操作Hbase的‘word’表,对‘content:info’中的短文做词频统计,并将统计结果写入‘stat’表的‘content:info中’,行键为单词

    8.3. 实现

    package com.itcast.hbase;

     

    import java.io.IOException;

    import java.util.ArrayList;

    import java.util.List;

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.hbase.HBaseConfiguration;

    import org.apache.hadoop.hbase.HColumnDescriptor;

    import org.apache.hadoop.hbase.HTableDescriptor;

    import org.apache.hadoop.hbase.client.HBaseAdmin;

    import org.apache.hadoop.hbase.client.HTable;

    import org.apache.hadoop.hbase.client.Put;

    import org.apache.hadoop.hbase.client.Result;

    import org.apache.hadoop.hbase.client.Scan;

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

    import org.apache.hadoop.hbase.mapreduce.TableMapper;

    import org.apache.hadoop.hbase.mapreduce.TableReducer;

    import org.apache.hadoop.hbase.util.Bytes;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    /**

     * mapreduce操作hbase

     * @author wilson

     *

     */

    public class HBaseMr {

    /**

     * 创建hbase配置

     */

    static Configuration config = null;

    static {

    config = HBaseConfiguration.create();

    config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

    config.set("hbase.zookeeper.property.clientPort", "2181");

    }

    /**

     * 表信息

     */

    public static final String tableName = "word";//表名1

    public static final String colf = "content";//列族

    public static final String col = "info";//列

    public static final String tableName2 = "stat";//表名2

    /**

     * 初始化表结构,及其数据

     */

    public static void initTB() {

    HTable table=null;

    HBaseAdmin admin=null;

    try {

    admin = new HBaseAdmin(config);//创建表管理

    /*删除表*/

    if (admin.tableExists(tableName)||admin.tableExists(tableName2)) {

    System.out.println("table is already exists!");

    admin.disableTable(tableName);

    admin.deleteTable(tableName);

    admin.disableTable(tableName2);

    admin.deleteTable(tableName2);

    }

    /*创建表*/

    HTableDescriptor desc = new HTableDescriptor(tableName);

    HColumnDescriptor family = new HColumnDescriptor(colf);

    desc.addFamily(family);

    admin.createTable(desc);

    HTableDescriptor desc2 = new HTableDescriptor(tableName2);

    HColumnDescriptor family2 = new HColumnDescriptor(colf);

    desc2.addFamily(family2);

    admin.createTable(desc2);

    /*插入数据*/

    table = new HTable(config,tableName);

    table.setAutoFlush(false);

    table.setWriteBufferSize(5);

    List<Put> lp = new ArrayList<Put>();

    Put p1 = new Put(Bytes.toBytes("1"));

    p1.add(colf.getBytes(), col.getBytes(), ("The Apache Hadoop software library is a framework").getBytes());

    lp.add(p1);

    Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes());

    lp.add(p2);

    Put p3 = new Put(Bytes.toBytes("3"));

    p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes());

    lp.add(p3);

    Put p4 = new Put(Bytes.toBytes("4"));

    p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes());

    lp.add(p4);

    Put p5 = new Put(Bytes.toBytes("5"));

    p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes());

    lp.add(p5);

    table.put(lp);

    table.flushCommits();

    lp.clear();

    } catch (Exception e) {

    e.printStackTrace();

    } finally {

    try {

    if(table!=null){

    table.close();

    }

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }

    /**

     * MyMapper 继承 TableMapper

     * TableMapper<Text,IntWritable>

     * Text:输出的key类型,

     * IntWritable:输出的value类型

     */

    public static class MyMapper extends TableMapper<Text, IntWritable> {

    private static IntWritable one = new IntWritable(1);

    private static Text word = new Text();

    @Override

    //输入的类型为:key:rowKey; value:一行数据的结果集Result

    protected void map(ImmutableBytesWritable key, Result value,

    Context context) throws IOException, InterruptedException {

    //获取一行数据中的colf:col

    String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表里面只有一个列族,所以我就直接获取每一行的值

    //按空格分割

    String itr[] = words.toString().split(" ");

    //循环输出word和1

    for (int i = 0; i < itr.length; i++) {

    word.set(itr[i]);

    context.write(word, one);

    }

    }

    }

    /**

     * MyReducer 继承 TableReducer

     * TableReducer<Text,IntWritable>

     * Text:输入的key类型,

     * IntWritable:输入的value类型,

     * ImmutableBytesWritable:输出类型,表示rowkey的类型

     */

    public static class MyReducer extends

    TableReducer<Text, IntWritable, ImmutableBytesWritable> {

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,

    Context context) throws IOException, InterruptedException {

    //对mapper的数据求和

    int sum = 0;

    for (IntWritable val : values) {//叠加

    sum += val.get();

    }

    // 创建put,设置rowkey为单词

    Put put = new Put(Bytes.toBytes(key.toString()));

    // 封装数据

    put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));

    //写到hbase,需要指定rowkey、put

    context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);

    }

    }

     

    public static void main(String[] args) throws IOException,

    ClassNotFoundException, InterruptedException {

    config.set("df.default.name", "hdfs://master:9000/");//设置hdfs的默认路径

    config.set("hadoop.job.ugi", "hadoop,hadoop");//用户名,组

    config.set("mapred.job.tracker", "master:9001");//设置jobtracker在哪

    //初始化表

    initTB();//初始化表

    //创建job

    Job job = new Job(config, "HBaseMr");//job

    job.setJarByClass(HBaseMr.class);//主类

    //创建scan

    Scan scan = new Scan();

    //可以指定查询某一列

    scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col));

    //创建查询hbase的mapper,设置表名、scan、mapper类、mapper的输出key、mapper的输出value

    TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job);

    //创建写入hbase的reducer,指定表名、reducer类、job

    TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job);

    System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    }

  • 相关阅读:
    如何得到数据绑定的树节点的父节点
    ImageBrush中的图片如何加载到到MemoryStream
    C#中动态加载和卸载DLL
    SetProcessWorkingSetSize减少内存占用
    wpf中如何改变Listbox选中项的颜色
    怎样把Visual Studio与Perforce关联起来
    在WPF里面如何使用FolderBrowserDialog
    关于WPF的ComboBox中Items太多而导致加载过慢的问题(转载)
    得到系统中所有正打开的文件
    把ResourceDictionary保存为文件,从外部xaml文件加载ResourceDictionary
  • 原文地址:https://www.cnblogs.com/shan13936/p/13781808.html
Copyright © 2011-2022 走看看