zoukankan      html  css  js  c++  java
  • HBase MapReduce

    1. HBase to HBase

    Mapper 继承 TableMapper,输入为Rowkey和Result.

    public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
    public TableMapper() {
    }
    }
    package com.scb.jason.mapper;
    
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2017/8/16.
     */
    public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> {
    
        private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();
    
        @Override
        public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //Get rowKey
            mapOutputkey.set(key.get());
            Put put = new Put(key.get());
            for(Cell cell:value.rawCells()){
    
                if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                    if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                }
    
            }
            context.write(mapOutputkey,put);
        }
    
    }

    Reducer 继承 TableReducer

    public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
    public TableReducer() {
    }
    }
    package com.scb.jason.reducer;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2017/8/16.
     */
    public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{
    
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            for(Put put:values){
                context.write(null,put);
            }
        }
    }

    Driver

    package com.scb.jason.driver;
    
    import com.scb.jason.mapper.User2BasicMapper;
    import com.scb.jason.reducer.User2BasicReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2017/8/16.
     */
    public class User2BasicDriver extends Configured implements Tool{
    
        public int run(String[] strings) throws Exception {
            Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
            job.setJarByClass(this.getClass());
            Scan scan = new Scan();
            scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false);  // don't set to true for MR jobs
            // set other scan attrs
    
            TableMapReduceUtil.initTableMapperJob(
                    "user",      // input table
                    scan,             // Scan instance to control CF and attribute selection
                    User2BasicMapper.class,   // mapper class
                    ImmutableBytesWritable.class,             // mapper output key
                    Put.class,             // mapper output value
                    job);
            TableMapReduceUtil.initTableReducerJob(
                    "basic",      // output table
                    User2BasicReducer.class,             // reducer class
                    job);
            job.setNumReduceTasks(1);
            boolean isSuccess = job.waitForCompletion(true);
            return isSuccess?1:0;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int status = ToolRunner.run(configuration,new User2BasicDriver(),args);
            System.exit(status);
        }
    
    }

     2. HBase to File

    Mapper

    package com.scb.jason.mapper;
    
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2017/8/16.
     */
    public class User2FileMapper extends TableMapper<Text, Text> {
    
        private Text rowKeyText = new Text();
        private Text valueText = new Text();
    
        @Override
        public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //Get rowKey
            rowKeyText.set(key.get());
            Put put = new Put(key.get());
            byte[] inforName = null;
            byte[] inforAge = null;
            for(Cell cell:value.rawCells()){
    
                if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        inforName = CellUtil.cloneValue(cell);
                    }
                    if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        inforAge = CellUtil.cloneValue(cell);
                    }
                }
    
            }
            valueText.set(new String(inforName)+"	"+new String(inforAge));
            context.write(rowKeyText,valueText);
        }
    
    }

    No Reducer Reducer

    Driver

    package com.scb.jason.driver;
    
    import com.scb.jason.mapper.User2BasicMapper;
    import com.scb.jason.mapper.User2FileMapper;
    import com.scb.jason.reducer.User2BasicReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * Created by Administrator on 2017/8/16.
     */
    public class User2FileDriver extends Configured implements Tool{
    
        public int run(String[] args) throws Exception {
            Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
            job.setJarByClass(this.getClass());
            Scan scan = new Scan();
            scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false);  // don't set to true for MR jobs
            // set other scan attrs
    
            TableMapReduceUtil.initTableMapperJob(
                    "user",      // input table
                    scan,             // Scan instance to control CF and attribute selection
                    User2FileMapper.class,   // mapper class
                    Text.class,             // mapper output key
                    Text.class,             // mapper output value
                    job);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job,new Path(args[0]));
            job.setNumReduceTasks(1);
            boolean isSuccess = job.waitForCompletion(true);
            return isSuccess?1:0;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int status = ToolRunner.run(configuration,new User2FileDriver(),args);
            System.exit(status);
        }
    
    }

    3. File to HBase

    Driver

    package com.scb.jason.driver;
    
    import com.scb.jason.mapper.File2HbaseMapper;
    import com.scb.jason.mapper.User2BasicMapper;
    import com.scb.jason.reducer.File2HBaseReducer;
    import com.scb.jason.reducer.User2BasicReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * Created by Administrator on 2017/8/16.
     */
    public class File2BasicDriver extends Configured implements Tool{
    
        public int run(String[] strings) throws Exception {
            Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
            job.setJarByClass(this.getClass());
            job.setMapperClass(File2HbaseMapper.class);
            FileInputFormat.addInputPath(job,new Path("F:\Workspace\File"));
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            TableMapReduceUtil.initTableReducerJob(
                    "basic",      // output table
                    File2HBaseReducer.class,             // reducer class
                    job);
            job.setNumReduceTasks(1);
            boolean isSuccess = job.waitForCompletion(true);
            return isSuccess?1:0;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int status = ToolRunner.run(configuration,new File2BasicDriver(),args);
            System.exit(status);
        }
    
    }

    Mapper

    package com.scb.jason.mapper;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    /**
     * Created by Administrator on 2017/8/17.
     */
    public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {
    
        private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();
    
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lineValue = value.toString();
            StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
            String rowkey = stringTokenizer.nextToken();
            String name = stringTokenizer.nextToken();
            String age = stringTokenizer.nextToken();
            Put put = new Put(Bytes.toBytes(rowkey));
            put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));
            put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));
            mapOutputkey.set(Bytes.toBytes(key.get()));
            context.write(mapOutputkey,put);
        }
    
    
    }

    Reducer

    package com.scb.jason.reducer;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2017/8/25.
     */
    public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {
    
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            for(Put put:values){
                context.write(null,put);
            }
        }
    
    }

     4. HBase to RDBMS

    public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {
    
      private Connection c = null;
    
      public void setup(Context context) {
        // create DB connection...
      }
    
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // do summarization
        // in this example the keys are Text, but this is just an example
      }
    
      public void cleanup(Context context) {
        // close db connection
      }
    
    }

    5. File -> HFile ->  HBase 批量导入

    http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html

  • 相关阅读:
    乘坐飞机时,有什么事情是机长和机上工作人员不想让乘客知道的?
    北京有哪些被废弃的地方值得一看?推荐理由是什么?
    在读硕士或博士是如何养活自己的?
    怎样有效提高记忆力?
    北京值得去的、不为人知的景点(或展览馆、美术馆、公园)有哪些?
    你收藏了哪些藏品?
    如何抓到入侵网站的黑客?
    中国姓氏的区域性?
    python之入门,你好,中国
    Eclipse 内置浏览器
  • 原文地址:https://www.cnblogs.com/xdlaoliu/p/7406789.html
Copyright © 2011-2022 走看看