zoukankan      html  css  js  c++  java
  • HBase与MapReduce

    一、新建一个mapreduce工程

    二、在工程根目录下新建lib文件夹,放入commons-lang-2.5.jar、hbase-0.94.12.jar、

    protobuf-java-2.4.0.jar、zookeeper-3.4.5.jar

    三、将lib下的jar包加入Java Build Path

    四、编写mapreduce代码

     1 package org.apache.hadoop.examples;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.hbase.HBaseConfiguration;
     8 import org.apache.hadoop.hbase.HColumnDescriptor;
     9 import org.apache.hadoop.hbase.HTableDescriptor;
    10 import org.apache.hadoop.hbase.client.HBaseAdmin;
    11 import org.apache.hadoop.hbase.client.Put;
    12 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    13 import org.apache.hadoop.hbase.mapreduce.TableReducer;
    14 import org.apache.hadoop.hbase.util.Bytes;
    15 import org.apache.hadoop.io.IntWritable;
    16 import org.apache.hadoop.io.LongWritable;
    17 import org.apache.hadoop.io.NullWritable;
    18 import org.apache.hadoop.io.Text;
    19 import org.apache.hadoop.mapreduce.Job;
    20 import org.apache.hadoop.mapreduce.Mapper;
    21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    22 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    23 
    24 public class WordCountHBase {
    25     public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
    26         private IntWritable i = new IntWritable(1);
    27         public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
    28             String s[] = value.toString().trim().split(" ");    //将输入的每行输入以空格分开
    29             for(String m : s){
    30                 context.write(new Text(m), i);
    31             }
    32         }
    33         
    34     }
    35     
    36     public static class Reduce extends TableReducer<Text,IntWritable,NullWritable>{
    37         public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
    38             int sum = 0;
    39             for(IntWritable i : values){
    40                 sum += i.get();
    41             }
    42             //Put实例
    43             Put put = new Put(Bytes.toBytes(key.toString()));
    44             //列族为content,列修饰符为count,列值为数目
    45             put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(String.valueOf(sum)));
    46             context.write(NullWritable.get(), put);
    47         }
    48     }
    49     
    50     public static void createHBaseTable(String tablename) throws IOException{
    51         HTableDescriptor htd = new HTableDescriptor(tablename);
    52         HColumnDescriptor col = new HColumnDescriptor("content");
    53         htd.addFamily(col);
    54         HBaseConfiguration config = new HBaseConfiguration();
    55         HBaseAdmin admin = new HBaseAdmin(config);
    56         if(admin.tableExists(tablename)){
    57             System.out.println("table exists,trying recreate table!");
    58             admin.disableTable(tablename);
    59             admin.deleteTable(tablename);
    60         }
    61         System.out.println("create new table:"+tablename);
    62         admin.createTable(htd);
    63     }
    64     
    65     public static void main(String args[]) throws Exception{
    66         String tablename = "wordcount";
    67         Configuration conf = new Configuration();
    68         //这句话很关键,缺失的话运行的时候会报错
    69         conf.set("mapred.job.tracker", "192.168.1.164:9001"); 
    70         conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
    71         createHBaseTable(tablename);
    72         //String input = args[0];        //设置输入值
    73         String input = "dedup_in";
    74         Job job = new Job(conf,"WordCount table with"+input);
    75         job.setJarByClass(WordCountHBase.class);
    76         job.setNumReduceTasks(3);
    77         job.setMapperClass(Map.class);
    78         job.setReducerClass(Reduce.class);
    79         job.setMapOutputKeyClass(Text.class);
    80         job.setMapOutputValueClass(IntWritable.class);
    81         job.setInputFormatClass(TextInputFormat.class);
    82         job.setOutputFormatClass(TableOutputFormat.class);
    83         FileInputFormat.addInputPath(job,new Path(input));
    84         System.exit(job.waitForCompletion(true)?0:1);
    85     }
    86 }
  • 相关阅读:
    推荐一个简洁优雅的博客系统,farbox
    flash从数据流判断图片格式防止xss攻击
    iBatis框架batch处理优化 (转)
    hadoop环境中误删除tmp文件夹的恢复
    Mysql04
    MapReduce中文翻译
    Big Table中文翻译
    GFS中文翻译
    HDFS读写流程
    两个网卡的设置
  • 原文地址:https://www.cnblogs.com/jingyunyb/p/3393753.html
Copyright © 2011-2022 走看看