zoukankan      html  css  js  c++  java
  • Hbase访问方式之Mapreduce

    Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。

    一个简单示例:

    说明:从日志表中,统计每个IP访问网站目录的总数

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. package man.ludq.hbase;  
    2.   
    3. import java.io.IOException;  
    4.   
    5. import org.apache.hadoop.conf.Configuration;  
    6. import org.apache.hadoop.hbase.HBaseConfiguration;  
    7. import org.apache.hadoop.hbase.client.Put;  
    8. import org.apache.hadoop.hbase.client.Result;  
    9. import org.apache.hadoop.hbase.client.Scan;  
    10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
    11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
    12. import org.apache.hadoop.hbase.mapreduce.TableMapper;  
    13. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
    14. import org.apache.hadoop.hbase.util.Bytes;  
    15. import org.apache.hadoop.io.IntWritable;  
    16. import org.apache.hadoop.io.Text;  
    17. import org.apache.hadoop.mapreduce.Job;  
    18.   
    19. public class ExampleTotalMapReduce{  
    20.     public static void main(String[] args) {  
    21.         try{  
    22.             Configuration config = HBaseConfiguration.create();  
    23.             Job job = new Job(config,"ExampleSummary");  
    24.             job.setJarByClass(ExampleTotalMapReduce.class);     // class that contains mapper and reducer  
    25.   
    26.             Scan scan = new Scan();  
    27.             scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs  
    28.             scan.setCacheBlocks(false);  // don't set to true for MR jobs  
    29.             // set other scan attrs  
    30.             //scan.addColumn(family, qualifier);  
    31.             TableMapReduceUtil.initTableMapperJob(  
    32.                     "access-log",        // input table  
    33.                     scan,               // Scan instance to control CF and attribute selection  
    34.                     MyMapper.class,     // mapper class  
    35.                     Text.class,         // mapper output key  
    36.                     IntWritable.class,  // mapper output value  
    37.                     job);  
    38.             TableMapReduceUtil.initTableReducerJob(  
    39.                     "total-access",        // output table  
    40.                     MyTableReducer.class,    // reducer class  
    41.                     job);  
    42.             job.setNumReduceTasks(1);   // at least one, adjust as required  
    43.   
    44.             boolean b = job.waitForCompletion(true);  
    45.             if (!b) {  
    46.                 throw new IOException("error with job!");  
    47.             }   
    48.         } catch(Exception e){  
    49.             e.printStackTrace();  
    50.         }  
    51.     }  
    52.   
    53.     public static class MyMapper extends TableMapper<Text, IntWritable>  {  
    54.   
    55.         private final IntWritable ONE = new IntWritable(1);  
    56.         private Text text = new Text();  
    57.   
    58.         public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {  
    59.             String ip = Bytes.toString(row.get()).split("-")[0];  
    60.             String url = new String(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("url")));  
    61.             text.set(ip+"&"+url);  
    62.             context.write(text, ONE);  
    63.         }  
    64.     }  
    65.   
    66.     public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {  
    67.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
    68.             int sum = 0;  
    69.             for (IntWritable val : values) {  
    70.                 sum += val.get();  
    71.             }  
    72.   
    73.             Put put = new Put(key.getBytes());  
    74.             put.add(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));  
    75.   
    76.             context.write(null, put);  
    77.         }  
    78.     }  
    79. }  

    参考文档:

    1、Mapreduce读取和写入Hbase(从A表读取数据,统计结果放入B表,非常详细,附有代码说明以及流程)
     
    2、Mapreduce操作Hbase(官方文档,包括 读/读写/多表输出/输出到文件/输出到RDBMS/Job中访问其他的HBase Tables)
  • 相关阅读:
    TCC
    使用RocketMQ实现分布式事务
    CentOS关机
    使用grub手动引导linux和windows
    CentOS下X Window与命令行界面的切换
    Centos下 为firefox安装flash插件
    tar.xz文件如何解压
    用Linux命令wget进行整站下载
    CentOS关闭火狐浏览器Flash过期提示
    CentOS普通用户添加sudo权限
  • 原文地址:https://www.cnblogs.com/bluecoder/p/3824265.html
Copyright © 2011-2022 走看看