zoukankan      html  css  js  c++  java
  • 使用MapReduce处理Hbase数据

      今天终于把MR处理Hbase的数据的程序搞定了,自己走了好多的弯路,程序写完之后,在本机的伪分布式的hadoop上跑是没问题的,可是把程序上传的集群上就出错了,最后发现是zookeeper没配对,在编译的时候没有把conf添加的CLASSPATH,这才导致出错的。

      下面是MR测试的程序:

     1 import java.io.IOException;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.hbase.HBaseConfiguration;
     5 import org.apache.hadoop.hbase.HColumnDescriptor;
     6 import org.apache.hadoop.hbase.HTableDescriptor;
     7 import org.apache.hadoop.hbase.client.HBaseAdmin;
     8 import org.apache.hadoop.hbase.client.Put;
     9 import org.apache.hadoop.hbase.client.Result;
    10 import org.apache.hadoop.hbase.client.Scan;
    11 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    12 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    13 import org.apache.hadoop.hbase.mapreduce.TableMapper;
    14 import org.apache.hadoop.hbase.mapreduce.TableReducer;
    15 import org.apache.hadoop.hbase.util.Bytes;
    16 import org.apache.hadoop.io.IntWritable;
    17 import org.apache.hadoop.io.Text;
    18 import org.apache.hadoop.mapreduce.Job;
    19 
    20 public class Test {
    21     private static final String sourceTable = "sourceTable";
    22     private static final String targetTable = "targetTable";
    23     static Configuration config = HBaseConfiguration.create();
    24     
    25     public static void createTable(String tablename, String[] cfs) throws IOException {
    26         HBaseAdmin admin = new HBaseAdmin(config);
    27         if (admin.tableExists(tablename)) {
    28             System.out.println("table already exists");
    29         }
    30         else {
    31             HTableDescriptor tableDesc = new HTableDescriptor(tablename);
    32             for (int i = 0; i < cfs.length; i++) {
    33                 tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
    34             }
    35             admin.createTable(tableDesc);
    36             System.out.println("create table successly");
    37         }
    38     }
    39     /**
    40      * @param args
    41      * @throws IOException
    42      * @throws ClassNotFoundException 
    43      * @throws InterruptedException 
    44      */
    45     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    46         // TODO Auto-generated method stub
    47         String[] cfs = {"a"};
    48         createTable(targetTable, cfs);
    49         Job job = new Job(config, "test");
    50         job.setJarByClass(Test.class);
    51         Scan scan = new Scan();
    52         scan.setCaching(1024);
    53         scan.setCacheBlocks(false);
    54         TableMapReduceUtil.initTableMapperJob(
    55                 sourceTable,        
    56                 scan,               
    57                 Mapper1.class,    
    58                 Text.class,         
    59                 IntWritable.class,  
    60                 job);    
    61         TableMapReduceUtil.initTableReducerJob(
    62                 targetTable,        
    63                 Reducer1.class,    
    64                 job);
    65         boolean b = job.waitForCompletion(true);
    66         if(!b){
    67             throw new IOException("error");
    68         }
    69     }
    70 
    71     public static class Mapper1 extends
    72             TableMapper<Text, IntWritable> {
    73             private final IntWritable ONE = new IntWritable(1);
    74             private Text text = new Text();
    75             public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException{
    76                 String id = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a")));
    77                 text.set(id);
    78                     context.write(id, ONE);
    79             }
    80     }
    81     public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
    82         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
    83             int i = 0;
    84             for (IntWritable val : values){
    85                 i += val.get();
    86             }
    87             Put put = new Put(Bytes.toBytes(key.toString()));
    88             put.add(Bytes.toBytes("a"), Bytes.toBytes("c"), Bytes.toBytes(i));
    89             context.write(null, put);
    90         }
    91     }
    92 }

    编写完成后需要打包,打包可以在本地打,也可以在服务器上的包,一定要设置CLASSPATH

    export CLASSPATH = /data/hadoop/hadoop-1.0.4/hadoop-core-1.0.4.jar:/data/hadoop/hbase-0.94.2/hbase-0.94.2.jar:/data/hadoop/hbase-0.94.2/conf/

    在终端运行这个命令或者直接将此命令下载家目录下的.bashrc中也可以,

    然后创建  test_classes文件夹,

    运行命令:

    javac -d test_classes/ Test.java

    运行完成后会在test_classes文件夹下生成3个.class文件

    然后运行

    jar -cvf test.jar -C test_classes .

    即可生成test.jar 文件

    最后运行:

    bin/hadoop jar test.jar Test

    运行MR程序即可

  • 相关阅读:
    linux 命令——48 watch (转)
    linux 命令——47 iostat (转)
    linux 命令——46 vmstat(转)
    linux 命令——45 free(转)
    linux 命令——44 top (转)
    linux 命令——43 killall(转)
    linux 命令——42 kill (转)
    linux 命令——41 ps(转)
    linux 命令——40 wc (转)
    Java for LeetCode 068 Text Justification
  • 原文地址:https://www.cnblogs.com/hitandrew/p/2855631.html
Copyright © 2011-2022 走看看