zoukankan      html  css  js  c++  java
  • HBase操作

    1、hbase API操作

    1)首先将core-site.xml、hbase-site.xml、hdfs-site.xml引入maven工程的resources下面

    2)配置pom.xml文件
    增加hbase依赖

    <dependencies>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.0</version>
    </dependency>
    </dependencies>

    3)创建HbaseTest.java

    package com.hsiehchou.hbase;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    public class HbaseTest {
    //配置信息
    public static Configuration conf;
    //获取配置信息
    static{
    //alt + enter
    conf = HBaseConfiguration.create();
    }

    判断hbase中表是否存在

    //1.判断hbase中表是否存在
    public static boolean isExist(String tableName) throws IOException{
    //对表操作需要用HbaseAdmin
    //HBaseAdmin admin = new HBaseAdmin(conf);老版本
    Connection connection = ConnectionFactory.createConnection(conf);
    //管理器
    HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
    return admin.tableExists(TableName.valueOf(tableName));
    }

    在hbase中创建表

    //2.在hbase中创建表
    public static void createTable(String tableName, String... columnFamily) throws IOException {
    //1.如果对表操作需要使用管理器
    Connection connection = ConnectionFactory.createConnection(conf);
    HBaseAdmin admin = (HBaseAdmin)connection.getAdmin();
    //2.创建描述器
    HTableDescriptor hd = new HTableDescriptor(TableName.valueOf(tableName));
    //3.指定多个列族
    for(String cf:columnFamily){
    hd.addFamily(new HColumnDescriptor(cf));
    }
    //4.创建表
    admin.createTable(hd);
    System.out.println("表已经创建成功!!!!");
    }

    bin/hbase shell操作
    list
    scan ‘ni’
    describe ‘ni’

    向表中添加数据

    //3,向表中添加数据 put rowkey cf:列族
    public static void addData(String tableName, String rowkey, String cf, String column, String value) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    //添加数据 put方式
    Put put = new Put(Bytes.toBytes(rowkey));
    //指定列族 列 值
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
    table.put(put);
    }

    删除一行数据

    //4.删除一行数据
    public static void deleteRow(String tableName, String rowkey) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    Delete delete = new Delete(Bytes.toBytes(rowkey));
    table.delete(delete);
    }

    删除多个rowkey的数据

    //5.删除多个rowkey的数据
    public static void deleteMore(String tableName, String... rowkey) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    //封装delete
    List<Delete> d = new ArrayList<Delete>();
    //遍历rowkey
    for(String rk:rowkey){
    Delete dd = new Delete(Bytes.toBytes(rk));
    d.add(dd);
    }
    table.delete(d);
    }

    全表扫描

    //6.全表扫描
    public static void scanAll(String tableName) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();
    ResultScanner rs = table.getScanner(scan);
    //遍历
    for(Result r:rs){
    //单元格
    Cell[] cells = r.rawCells();
    for(Cell c:cells) {
    System.out.println("rowkey为:" + Bytes.toString(CellUtil.cloneRow(c)));
    System.out.println("列族为:" + Bytes.toString(CellUtil.cloneFamily(c)));
    System.out.println("值为:" + Bytes.toString(CellUtil.cloneValue(c)));
    }
    }
    }

    删除表

    //7.删除表
    public static void deleteTable(String tableName) throws IOException {
    //1.如果对表操作需要使用管理器
    Connection connection = ConnectionFactory.createConnection(conf);
    HBaseAdmin admin = (HBaseAdmin)connection.getAdmin();
    admin.disableTable(tableName);
    admin.deleteTable(TableName.valueOf(tableName));
    }

    public static void main(String[] args) throws IOException {
    //System.out.println(isExist(“user”));
    //create ‘表名’,’列族名’
    //createTable(“ni”,”info1”,”info2”,”info3”);
    //addData(“ni”,”shanghai”,”info1”,”name”,”lilei”);
    //deleteRow(“ni”,”shanghai”);
    //deleteMore(“ni”,”shanghai1”,”shanghai2”);

    //scanAll(“ni”);
    deleteTable(“ni”);
    }
    }

    2、hbase-MR

    hbase主要擅长的领域是存储数据,不擅长分析数据

    hbase如果想计算的话需要结合hadoop的mapreduce

    hbase-mr所需的jar包查看
    bin/hbase mapredcp

    配置临时环境变量

    export HBASE_HOME=/root/hd/hbase-1.3.0
    export HADOOP_HOME=/root/hd/hadoop-2.8.4
    export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

    跑hbase-mr程序
    bin/yarn jar /root/hd/hbase-1.3.0/lib/hbase-server-1.3.0.jar rowcounter user

    3、hbase的表操作

    场景一:
    region分片
    指定列的过滤
    name age high
    name

    代码实现
    ReadLoveMapper.java

    package com.hsiehchou.mr;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import java.io.IOException;
    /**
    * HBase -MR
    * mapper类进行对数据的读取操作
    * key:ImmutableBytesWritable hbase中的rowkey
    * value:封装的一条条的数据
    */
    public class ReadLoveMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    //1.读取数据 根据rowkey拿到数据
    Put put = new Put(key.get());
    //2.过滤列 Cell单元格
    for (Cell c:value.rawCells()){
    //拿到info列族数据 如果是info列族 取出 如果不是info 过滤掉
    if("info".equals(Bytes.toString(CellUtil.cloneFamily(c)))){
    //过滤列
    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(c)))){
    put.add(c);
    }
    }
    }
    //3.输出到reducer端
    context.write(key,put);
    }
    }

    WriteLoveReducer .java

    package com.hsiehchou.mr;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import java.io.IOException;
    /**
    * keyIn:ImmutableBytesWritable
    * valueIn:Put
    * keyOut:NullWritable(在put里面已经有了rowkey了,所以不需要了)
    */
    public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    for (Put p:values){
    context.write(NullWritable.get(),p);
    }
    }
    }

    LoverDriver .java

    package com.hsiehchou.mr;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    public class LoverDriver implements Tool {
    private Configuration conf;
    public void setConf(Configuration configuration) {
    this.conf = HBaseConfiguration.create(configuration);
    }
    public Configuration getConf() {
    return this.conf;
    }
    public int run(String[] strings) throws Exception {
    //1.创建任务
    Job job = Job.getInstance(conf);
    //2.指定运行的主类
    job.setJarByClass(LoverDriver.class);
    //3.配置job
    Scan scan = new Scan();
    //4.设置具体运行的mapper类
    TableMapReduceUtil.initTableMapperJob("love",
    scan,
    ReadLoveMapper.class,
    ImmutableBytesWritable.class,
    Put.class,
    job
    );
    //5.设置具体运行的Reducer类
    TableMapReduceUtil.initTableReducerJob("lovemr",
    WriteLoveReducer.class,
    job
    );
    //6.设置reduceTask
    job.setNumReduceTasks(1);
    boolean rs = job.waitForCompletion(true);
    return rs?0:1;
    }
    public static void main(String[] args) {
    try {
    //状态码
    int sts = ToolRunner.run(new LoverDriver(), args);
    System.exit(sts);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    场景二:
    把hdfs中的数据导入到hbase表中
    hbase-mr

    代码实现
    ReadHdfsMapper .java

    package com.hsiehchou.mr1;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    /**
    * 读取hdfs中的数据
    * hdfs ->hbase
    */
    public class ReadHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //1.读取数据
    String line = value.toString();
    //2.切分数据
    String[] fields = line.split(" ");
    //3.封装数据
    byte[] rowkey = Bytes.toBytes(fields[0]);
    byte[] name = Bytes.toBytes(fields[1]);
    byte[] desc = Bytes.toBytes(fields[2]);
    //4.封装成put
    Put put = new Put(rowkey);
    put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),name);
    put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("desc"),desc);
    //5.输出到reducer
    context.write(new ImmutableBytesWritable(rowkey),put);
    }
    }

    WriteHbaseReducer.java

    package com.hsiehchou.mr1;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import java.io.IOException;
    public class WriteHbaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    for(Put p:values){
    context.write(NullWritable.get(),p);
    }
    }
    }

    LoveDriver.java

    package com.hsiehchou.mr1;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    public class LoveDriver implements Tool {
    private Configuration conf = null;
    public void setConf(Configuration configuration) {
    this.conf = HBaseConfiguration.create(configuration);
    }
    public Configuration getConf() {
    return this.conf;
    }
    public int run(String[] strings) throws Exception {
    //1.创建job
    Job job = Job.getInstance();
    job.setJarByClass(LoveDriver.class);
    //2.配置mapper
    job.setMapperClass(ReadHdfsMapper.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    //3.配置reducer
    TableMapReduceUtil.initTableReducerJob("lovehdfs", WriteHbaseReducer.class, job);
    //4.输入配置 hdfs读数据 inputformat
    FileInputFormat.addInputPath(job,new Path("/lovehbase/"));
    //5.需要配置outputformat吗?不需要 reducer中已经指定了表
    return job.waitForCompletion(true)? 0:1;
    }
    public static void main(String[] args) {
    try {
    int sts = ToolRunner.run(new LoveDriver(),args);
    System.exit(sts);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    4、hbase优化

    1)预分区问题
    region分片?表很大 bigtable

    分布式?数据量大

    region存储数据,如果有多个region,每个region负责维护一部分的rowkey{startrowkey, endrowkey}
    1~10001
    1~2001 1980
    2001~40002

    分多少片?提前规划好,提高hbase的性能
    进行存储数据前做好rowkey的预分区优化hbase

    实际操作:
    create ‘user_p’,’info’,’partition’,SPLITS =>[‘201’,’202’,’203’,’204’]

    Table Regions

    Region Server Start Key End Key
    hsiehchou123:16020 -∞ 201
    hsiehchou124:16020 201 202
    hsiehchou124:16020 202 203
    hsiehchou123:16020 203 204
    hsiehchou122:16020 204 +∞

    create ‘user_pppp’,’partition’,SPLITS_FILE => ‘partitions.txt’

    partitions.txt’放在hbase-shell路径下

    2)rowkey如何设计
    rowkey是数据的唯一标识,这条数据存储在哪个分区由预分区范围决定

    合理设计rowkey
    如一份数据分为5个region存储
    但是我们需要尽可能的保持每个region中的数据量差不多

    尽可能的打散数据,平均分配到每个region中即可

    解决方案:
    生成随机数、hash/散列值
    原本的rowkey是201,hash后
    dfgyfugpgdcjhgfd11412nod
    202变为:
    21dqddwdgjohfxsovbxiufq12

    字符串拼接:
    20190316_a3d4
    20190316_g04f

    反转字符串:
    201903161->161309102
    201903162->261309102

    3)hbase基础优化
    hbase用的hdfs存储
    datanode允许最大文件打开数
    默认4096 调大
    dfs.datanode.max.transfer.threads
    hdfs-site.xml

    优化等待时间
    dfs.image.transfer.timeout
    默认60000毫秒
    调大

    内存优化:
    hadoop-env.sh设置内存的堆大小
    30%~40%最好

    2G
    512m

    export HADOOP_PORTMAP_OPTS=’-Xmx512m $HADOOP_PORTMAP_OPTS’

  • 相关阅读:
    python(打印九九乘法表,三角形)
    Python (内置函数)
    python (生成器,生成推导式)
    python (函数名,闭包和迭代器)
    python (函数命名空间和作用域)
    python (函数)
    python (文件)
    python (集合和深浅拷贝)
    jquery 学习(四)
    JavaScript练习
  • 原文地址:https://www.cnblogs.com/hsiehchou/p/10548257.html
Copyright © 2011-2022 走看看