zoukankan      html  css  js  c++  java
  • HBase 与Map Reduce的联合使用

    在工作中常常会碰到利用MR来对HBase来操作的情况,这里举几个经典使用场景来说明

    环境准备:

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
    
    </dependencies>

    如果用普通的Configuration,需要自己手动添加集群位置

    如果利用HBaseConfiguration,需要将hbase-site.xml  添加到类路径下,这样就可以自动读取hbase的配置了

     

    一、将HDFS上的数据导入到HBase中

    数据如下

    10      ACCOUNTING      1700
    20      RESEARCH        1800
    30      SALES           1900
    40      OPERATIONS      1700

    将其导入到Hbase中,也就是新建一个表

    1、Mapper

    因为是从HDFS上读取内容,所以需要继承正常的Mapper类

    public class Example2Mapper extends Mapper<LongWritable,Text,Text,Put> {
    
        private Text key_out=new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            String[] words = value.toString().split("	");
    
            key_out.set(words[0]);
    
            //封装Put
            Put put = new Put(Bytes.toBytes(words[0]));
    
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(words[1]))
                    .addColumn(Bytes.toBytes("info"),Bytes.toBytes("num"),Bytes.toBytes(words[2]));
            //写出
            context.write(key_out,put);
    
    
    
        }
    }

    2、Reducer

    因为是向Hbase上写数据,需要继承Hbase提供的TableReducer类,

    TableReducer要求Valueout必须是一个Mutation类型!Mutation类型代表一个写操作对象,

    例如Put,Delete等!

    这里正好传入的就是Put,所以Reduer这不需要进行任何逻辑

    public class Example2Reducer extends TableReducer<Text,Put,Text> {
    
    
    }

    3、Driver

    在Hadoop中,只要有reducer阶段,key-value必须实现序列化!

    hadoop提供了Writable接口! key-value如果实现了Wriable接口,hadoop会提供序列化器!

    如果Key-value没有实现Wriable接口,需要自己提供序列化器!

    所以在设置Driver时,可以使用TableMapReduceUtil工具类!

    调用工具类,工具类可以自动进行各种设置,包括提供序列化器

    注意:需要提前把目标表创建好,结构与Mapper中的逻辑保持一致

    public class Example2Driver {
    
        public static void main(String[] args) throws Exception {
    
            //创建configuration,调用此方法的configuration可以读取hbase的配置文件
            Configuration conf = HBaseConfiguration.create();
    
    
            //创建Job
            Job job = Job.getInstance(conf);
    
            job.setMapperClass(Example2Mapper.class);
    
            FileInputFormat.setInputPaths(job,new Path("hdfs://hadoop102:9000/hbasedata/dept.data"));
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Put.class);
    
            //设置Reducer
            TableMapReduceUtil.initTableReducerJob("dept_mr2", Example2Reducer.class,job);
    
            job.waitForCompletion(true);
        }
    }

    二、利用MR将HBase表中的部分数据复制到另一份表中

     因为是复制表,所以需要新建一个结构跟原表一模一样的表

    1、Mapper

    从HBase中读取数据,需要继承TableMapper

    注意:如果Mapper的输出是 Put类型,此时系统自动设置了PutCombiner!会根据key来进行combiner

    在某些场景,例如将记录迁移的场景,是不需要Combiner的!所以需要将key设为唯一值

    public class Example1Mapper extends TableMapper<Text,Put> {
    
        private Text key_out=new Text();
    
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    
            //获取rowkey
            String rowKey = Bytes.toString(key.copyBytes());
    
            key_out.set(rowKey);
    
            //判断rowkey是否>20
            if (Integer.parseInt(rowKey) > 20){
    
                Put put = new Put(key.copyBytes());
    
                //获取Result中的每个Cell
                Cell[] cells = value.rawCells();
    
                for (Cell cell : cells) {
                //将Result中的多个cell,封装到put对象中,每个cell作为一列
                    put.add(cell);
    
                }
    
                //写出
                context.write(key_out,put);
    
            }
    
        }
    }

    2、Reducer

    无需任何逻辑

    public class Example1Reducer extends TableReducer<Text,Put,Text> {
    
    
    }

    3、Driver

    public class Example1Driver {
    
        public static void main(String[] args) throws Exception {
    
            //创建configuration,调用此方法的configuration可以读取hbase的配置文件
            Configuration conf = HBaseConfiguration.create();
    
            //创建Job
            Job job = Job.getInstance(conf);
    
            // 创建扫描器
            Scan scan = new Scan();
    
            //设置Mapper
                //会自动提供序列化器
            TableMapReduceUtil.initTableMapperJob
                    ("dept",scan,Example1Mapper.class, Text.class, Put.class,job);
    
            //设置Reducer
            TableMapReduceUtil.initTableReducerJob("dept_mr",Example1Reducer.class,job);
    
            job.waitForCompletion(true);
        }
    }

     

  • 相关阅读:
    VS2010 LNK1123: 转换到 COFF 期间失败: 文件无效或损坏 的解决方法
    Navicat Premium11.0.16 for mac 中文破解版
    angular input输入框中使用filter格式化日期
    Mac下搭建Eclipse Android开发环境
    mac下修改.bash_profile立即生效的方法
    Ionic ngMessage 表单验证
    mongodb授权登录
    Ionic开发之条形码扫描
    ionic 到真相后$http.get()无法请求,导致空白的情况,如何解决
    Xcode 7中http通信出现如下错误:Application Transport Security has blocked a cleartext HTTP (http://) resource load since it is insecure. Temporary exceptions can be configured via your app's Info.plist file.
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/12543123.html
Copyright © 2011-2022 走看看