zoukankan      html  css  js  c++  java
  • MapReduce程序

     

    [Hadoop编程实践]一个实用、清晰的MapReduce程序

    今天写的日志合并MapReduce程序,重新梳理了一遍写一个MapReduce Job的要点:

    1. 参数获取。

    我一般都会在参数中包含这几项:输入文件路径、工作路径(.finish文件所在的目录)、输出文件路径(结果数据所在的路径,在实际工程中,一般和工作路径不同)。还有一个wait/submit参数,用来说明Job是通过waitForCompletion还是submit的方式提交,waitForCompletion在测试和调试时用,submit在生产环境中用。

    2. 参数检查

    各种参数的格式检查,通不过就直接退出,这一步要严格。

    3. 创建Job

    4. 设定mapper、reducer

    可能还需要设定partitioner,sort comparator, grouping comparator,因任务的复杂程度而定。

    5. 设定输入和工作路径

    注意FileOutputFormat.setOutputPath(job, new Path(workingDir));设置的是workingDir,在实践中一般都将workingDir和最终数据的outputDir分开。主要是因为workingDir得到的数据都是part-00000这样的形式,不能自己命名。所以一般会在最后reducer中自己用FileWriter去创建结果数据文件,不用context.write.

    6. 设定输入和输出文件格式

    7. 设置配置项

    为了在mapper、reducer以及Job的其他worker之间共享一些简单的数据,可以使用JobConf. 如果要共享复杂、量大的数据,可以使用DistributedCache。在最近的实践中,有用序列化文件+DistributedCache在各个Job worker之间共享HashMap,List以及其他自定义数据结构的经验,可行。

    8. 提交Job

    代码如下,敬请批评。

    复制代码
      1 import java.io.IOException;
      2 import java.util.regex.Matcher;
      3 import java.util.regex.Pattern;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.mapred.JobConf;
      8 import org.apache.hadoop.mapreduce.Job;
      9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     13 import org.apache.hadoop.util.GenericOptionsParser;
     14 
     15 import com.hadoop.compression.lzo.LzopCodec;
     16 
     17 /**
     18  * MapReduce job to combine all hourly logs from different data-collection servers
     19  * @author lsyang, 20130507
     20  */
     21 public class HourlyLogCombine {
     22     private static String RAW_FILE_PREFIX = "post_";
     23     private static String RAW_FILE_POSTFIX = ".log";
     24     
     25     public static String JOB_CONF_DATE = "HourlyLogCombine.Date";
     26     public static String JOB_CONF_HOUR = "HourlyLogCombine.Hour";
     27     public static String JOB_CONF_OUTDIR = "HourlyLogCombine.OutDir";
     28     
     29     private static void showHelpAndExit(String info) {
     30         System.err.println("Usage: HourlyLogCombine <Date: yyyyMMdd> <Hour: hh> " +
     31                 "<RowLogDir, e.g. /user/click_tracker/appbi/data/raw/> " +
     32                 "<workingDir, e.g. /user/click_tracker/appbi/working/>" +
     33                 "<CombineLogDir, e.g. /user/click_tracker/appbi/data/hourly_combine/>" +
     34                 "<wait or submit>");
     35         if(info != null && !info.isEmpty()) {
     36             System.err.println("Error: " + info);
     37         }
     38         System.exit(0);
     39     }
     40     
     41     private static void checkDate(String date) {
     42         String regex = "^(20\\d\\d)(0\\d|1[012])(0[1-9]|[12][0-9]|3[01])$";
     43         Pattern pattern = Pattern.compile(regex);
     44         Matcher matcher = pattern.matcher(date);
     45         if (!matcher.find()) {
     46             showHelpAndExit("wrong date format.");
     47         }
     48     }
     49 
     50     private static void checkHour(String hour) {
     51         String regex = "^[0-1]\\d|2[0-3]$";
     52         Pattern pattern = Pattern.compile(regex);
     53         Matcher matcher = pattern.matcher(hour);
     54         if (!matcher.find()) {
     55             showHelpAndExit("wrong hour format.");
     56         }
     57     }
     58     
     59     private static boolean checkWaitOrSubmit(String waitORsubmit) {
     60         if (waitORsubmit.equalsIgnoreCase("wait")) {
     61             return true;
     62         } else if (waitORsubmit.equalsIgnoreCase("submit")) {
     63             return false;
     64         } else {
     65             showHelpAndExit("wait or submit: please check the spelling.");
     66             return false;
     67         }
     68     }
     69     
     70     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
     71         // get the application-specific arguments
     72         Configuration conf = new Configuration();
     73         String[] params = new GenericOptionsParser(conf, args).getRemainingArgs();
     74         if(params.length != 6) {
     75             showHelpAndExit("6 params needed.");
     76         }
     77         
     78         // parameters
     79         String date = params[0];
     80         String hour = params[1];
     81         String rawLogHome = params[2];
     82         String workingHome = params[3];
     83         String combinedLogHome = params[4];
     84         String waitORsubmit = params[5];
     85         if (!rawLogHome.endsWith("/")) rawLogHome += "/";
     86         if(!combinedLogHome.endsWith("/")) combinedLogHome += "/";
     87         
     88         // check parameters
     89         checkDate(date);
     90         checkHour(hour);
     91         boolean wait = checkWaitOrSubmit(waitORsubmit);
     92         
     93         // get input files
     94         String inputFiles = rawLogHome + "*/" + date + "/" + RAW_FILE_PREFIX + date + "_" + hour + RAW_FILE_POSTFIX;
     95         // get working dir, where the .finish file resides
     96         String workingDir = workingHome + date + "/" + hour + "/";
     97         // get output dir, where the combined log file resides
     98         String outDir = combinedLogHome + date + "/";
     99         
    100         // create a mapreduce job
    101         Job job = new Job(conf, "HourlyLogCombine");
    102         job.setJarByClass(HourlyLogCombine.class);
    103         
    104         // set mapper, partitioner and reducer
    105         job.setMapperClass(HourlyLogCombineMapper.class);
    106         job.setPartitionerClass(HourlyLogCombinePartitioner.class);
    107         job.setReducerClass(HourlyLogCombineReducer.class);
    108         
    109         // set input and output dir
    110         FileInputFormat.addInputPath(job, new Path(inputFiles));
    111         FileOutputFormat.setOutputPath(job, new Path(workingDir));
    112         
    113         // set input and output file format
    114         job.setInputFormatClass(TextInputFormat.class);
    115         job.setOutputFormatClass(TextOutputFormat.class);
    116         TextOutputFormat.setCompressOutput(job, true);
    117         TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
    118         
    119         // set configurations
    120         JobConf jobConf = (JobConf)job.getConfiguration();
    121         jobConf.set(JOB_CONF_DATE, date);
    122         jobConf.set(JOB_CONF_HOUR, hour);
    123         jobConf.set(JOB_CONF_OUTDIR, outDir);
    124         
    125         // run the job
    126         if (wait) {
    127             job.waitForCompletion(true);
    128         } else {
    129             job.submit();
    130         }
    131     }
    132 }
    复制代码

    【Linux常用指令整理9】vi的使用1

     

    vi 有三种工作模式,命令模式、插入模式和编辑模式。

    键入命令:vi + 文件名 就可启动vi,缺省是命令模式。

     

    键入i进入插入模式,插入模式按esc回到命令模式

     

    命令模式

    插入命令

    a 在光标后插入文本

    A 在行末插入文本

    i 在光标前插入文本

    I 在行首插入文本

    o 在光标下插入文本

    O 在光标上插入文本

     

    移动光标

    h 向左移动

    j  向下移动

    k 向上移动

    l  向右移动

     

    $ 移至行首

    0 移至行尾

     

    H 跳到上端

    M 跳到中端

    L 跳到下端

     

    跳到具体行

    :set nu 设置行号

    :set nonu 取消行号

    gg 到第一行

    G 到最后一行

    nG 到第n行

    :n 到第n行

     

     

    删除命令

    x 删除光标字符

    nx 删除光标开始以后的n个字符

     

    dd 删除光标所在行

    ndd 删除光标开始以后的n行

    D 删除光标开始到行尾的内容

    dG 删除光标开始到末尾的内容

     

    :n1,n2d 删除n1行到n2行的内容

     

    复制、剪切、粘贴

    yy 复制当前行

    nyy 复制光标及以下的n行

     

    dd 剪切当前行

    ndd 剪切光标及以下的n行

     

    p 粘贴光标所在行下面

    P 粘贴到光标所在行的上面

     

    替换、撤销操作

    r 替换光标所在字符

    R 从光标所在字符开始替换,直到按esc结束

    u 撤销上一步操作

     

    搜索、替换

    /关键字 搜索和关键字有关的行,按n查看下一个,按N查看上一个

    :set ic 搜索时忽略大小写

    :set noic 关闭忽略大小写

     

    :%s/old/new/g 全文替换

    :n1,n2s/old/new/g 只替换n1到n2

     

    :wq 退出保存键入

    :q 退出不保存键入

    :w 保存不退出

    :w 文件名 另存为

    注:如果权限不足在后面加!,但仅限于root和所有者有用

  • 相关阅读:
    【洛谷 P4166】 [SCOI2007]最大土地面积(凸包,旋转卡壳)
    专题
    【洛谷 P3299】 [SDOI2013]保护出题人 (凸包,三分,斜率优化)
    【洛谷 P3628】 [APIO2010]特别行动队 (斜率优化)
    $POJ1995$ $Raising$ $Modulo$ $Numbers$
    快速运算模板(未完待续)
    $Luogu$ $P1879$ $[USACO06NOV]$ 玉米田 $Corn Fields$
    [转载] $AT2444$ 题解
    [转载] $CF117B$ 题解
    [转载] $CF543B$ 题解
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3065663.html
Copyright © 2011-2022 走看看