zoukankan      html  css  js  c++  java
  • 使用MapReduce处理Hbase数据

      今天终于把MR处理Hbase的数据的程序搞定了,自己走了好多的弯路,程序写完之后,在本机的伪分布式的hadoop上跑是没问题的,可是把程序上传的集群上就出错了,最后发现是zookeeper没配对,在编译的时候没有把conf添加的CLASSPATH,这才导致出错的。

      下面是MR测试的程序:

     1 import java.io.IOException;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.hbase.HBaseConfiguration;
     5 import org.apache.hadoop.hbase.HColumnDescriptor;
     6 import org.apache.hadoop.hbase.HTableDescriptor;
     7 import org.apache.hadoop.hbase.client.HBaseAdmin;
     8 import org.apache.hadoop.hbase.client.Put;
     9 import org.apache.hadoop.hbase.client.Result;
    10 import org.apache.hadoop.hbase.client.Scan;
    11 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    12 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    13 import org.apache.hadoop.hbase.mapreduce.TableMapper;
    14 import org.apache.hadoop.hbase.mapreduce.TableReducer;
    15 import org.apache.hadoop.hbase.util.Bytes;
    16 import org.apache.hadoop.io.IntWritable;
    17 import org.apache.hadoop.io.Text;
    18 import org.apache.hadoop.mapreduce.Job;
    19 
    20 public class Test {
    21     private static final String sourceTable = "sourceTable";
    22     private static final String targetTable = "targetTable";
    23     static Configuration config = HBaseConfiguration.create();
    24     
    25     public static void createTable(String tablename, String[] cfs) throws IOException {
    26         HBaseAdmin admin = new HBaseAdmin(config);
    27         if (admin.tableExists(tablename)) {
    28             System.out.println("table already exists");
    29         }
    30         else {
    31             HTableDescriptor tableDesc = new HTableDescriptor(tablename);
    32             for (int i = 0; i < cfs.length; i++) {
    33                 tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
    34             }
    35             admin.createTable(tableDesc);
    36             System.out.println("create table successly");
    37         }
    38     }
    39     /**
    40      * @param args
    41      * @throws IOException
    42      * @throws ClassNotFoundException 
    43      * @throws InterruptedException 
    44      */
    45     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    46         // TODO Auto-generated method stub
    47         String[] cfs = {"a"};
    48         createTable(targetTable, cfs);
    49         Job job = new Job(config, "test");
    50         job.setJarByClass(Test.class);
    51         Scan scan = new Scan();
    52         scan.setCaching(1024);
    53         scan.setCacheBlocks(false);
    54         TableMapReduceUtil.initTableMapperJob(
    55                 sourceTable,        
    56                 scan,               
    57                 Mapper1.class,    
    58                 Text.class,         
    59                 IntWritable.class,  
    60                 job);    
    61         TableMapReduceUtil.initTableReducerJob(
    62                 targetTable,        
    63                 Reducer1.class,    
    64                 job);
    65         boolean b = job.waitForCompletion(true);
    66         if(!b){
    67             throw new IOException("error");
    68         }
    69     }
    70 
    71     public static class Mapper1 extends
    72             TableMapper<Text, IntWritable> {
    73             private final IntWritable ONE = new IntWritable(1);
    74             private Text text = new Text();
    75             public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException{
    76                 String id = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a")));
    77                 text.set(id);
    78                     context.write(id, ONE);
    79             }
    80     }
    81     public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
    82         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
    83             int i = 0;
    84             for (IntWritable val : values){
    85                 i += val.get();
    86             }
    87             Put put = new Put(Bytes.toBytes(key.toString()));
    88             put.add(Bytes.toBytes("a"), Bytes.toBytes("c"), Bytes.toBytes(i));
    89             context.write(null, put);
    90         }
    91     }
    92 }

    编写完成后需要打包,打包可以在本地打,也可以在服务器上的包,一定要设置CLASSPATH

    export CLASSPATH = /data/hadoop/hadoop-1.0.4/hadoop-core-1.0.4.jar:/data/hadoop/hbase-0.94.2/hbase-0.94.2.jar:/data/hadoop/hbase-0.94.2/conf/

    在终端运行这个命令或者直接将此命令下载家目录下的.bashrc中也可以,

    然后创建  test_classes文件夹,

    运行命令:

    javac -d test_classes/ Test.java

    运行完成后会在test_classes文件夹下生成3个.class文件

    然后运行

    jar -cvf test.jar -C test_classes .

    即可生成test.jar 文件

    最后运行:

    bin/hadoop jar test.jar Test

    运行MR程序即可

  • 相关阅读:
    python super()
    git用法小结(1)--建立远程仓库
    git用法小结(1)--建立远程仓库
    Linux下多线程查看工具(pstree、ps、pstack)
    JAVA操作Oracle数据库中的事务
    MyEclipse 2013优化配置【转】
    C语言多线程编程 死锁解析
    在linux终端执行clear或top命令时出现:'xterm' unknown terminal type的错误
    Linux中获取本机网络信息的几个函数及应用
    基于内存的通信之一 “内核共享消息队列”
  • 原文地址:https://www.cnblogs.com/hitandrew/p/2855631.html
Copyright © 2011-2022 走看看