zoukankan      html  css  js  c++  java
  • mapreduce 程序实例:hotlink统计程序及其oozie 调度

    mapreduce程序 :

     比较好的博客:

    1.  http://blog.csdn.net/posa88?viewmode=contents  hadoop源码解析系列

    2.  http://www.cnblogs.com/xia520pi/  hadoop集群系列 (特别是其中的 http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html  初级案例)

    描述: 提取出二进制文件里的短链,并统计排序(由大到小排序)

    思路: 在hotlinkCount中提取短链,并统计 (key:短链    value:出现次数)。 在hotlinkSort中读取上个程序的结果进行排序。

    HotlinkCount:

      1 /*
      2  * Copyright (c) EEFUNG 2014 All Rights Reserved
      3  *
      4  * 统计每个连接出现的频率
      5  *
      6  */
      7 package com.eefung.hstore.mr.job.hotLink;
      8 
      9 import cn.antvision.eagleattack.model.Status;
     10 import com.eefung.hstore.help.HstoreConstants;
     11 import com.eefung.hstore.help.AvroUtils;
     12 import com.eefung.hstore.mr.input.TimeFramePathFilter;
     13 import org.apache.hadoop.conf.Configuration;
     14 import org.apache.hadoop.conf.Configured;
     15 import org.apache.hadoop.fs.Path;
     16 import org.apache.hadoop.io.BytesWritable;
     17 import org.apache.hadoop.io.IntWritable;
     18 import org.apache.hadoop.io.LongWritable;
     19 import org.apache.hadoop.io.Text;
     20 import org.apache.hadoop.mapreduce.Job;
     21 import org.apache.hadoop.mapreduce.Mapper;
     22 import org.apache.hadoop.mapreduce.Reducer;
     23 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     24 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     25 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     26 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     27 import org.apache.hadoop.util.Tool;
     28 import org.apache.hadoop.util.ToolRunner;
     29 import org.slf4j.Logger;
     30 import org.slf4j.LoggerFactory;
     31 import java.io.IOException;
     32 import java.util.regex.Matcher;
     33 import java.util.regex.Pattern;
     34 
     35 public class HotLinkCount extends Configured implements Tool {
     36 
     37     private static final Logger logger = LoggerFactory
     38             .getLogger(HotLinkCount.class);
     39 
     40     public static void main(String[] args) throws Exception {
     41         System.exit(ToolRunner.run(new Configuration(), new HotLinkCount(), args));
     42     }
     43 
     44     @Override
     45     public int run(String[] args) throws Exception {
     46         if (args.length != 3) {
     47             logger.warn("wrong args!");
     48             return 1;
     49         }
     50         String input = args[0].trim();
     51         String output = args[1].trim();
     52         int reducerNum = Integer.parseInt(args[2].trim());
     53         Job job = Job.getInstance(getConf());
     54         job.setJobName("HotLink Count");
     55         job.setJarByClass(HotLinkCount.class);
     56         job.setMapperClass(InnerMapper.class);
     57         job.setReducerClass(InnerReducer.class);
     58 
     59         job.setInputFormatClass(SequenceFileInputFormat.class);
     60         job.setOutputFormatClass(TextOutputFormat.class);
     61 
     62         job.setMapOutputKeyClass(Text.class);
     63         job.setMapOutputValueClass(LongWritable.class);
     64         job.setOutputKeyClass(LongWritable.class);
     65         job.setOutputValueClass(Text.class);
     66 
     67         job.setNumReduceTasks(reducerNum);
     68         FileInputFormat.setInputPathFilter(job, TimeFramePathFilter.class);///
     69 
     70 
     71         String[] sources = input.split(HstoreConstants.DELIMITER_COMMA);
     72         for (String source : sources) {
     73             FileInputFormat.addInputPath(job, new Path(source));
     74         }
     75 
     76         FileOutputFormat.setOutputPath(job, new Path(output));
     77         return job.waitForCompletion(true) ? 0 : 1;
     78     }
     79 
     80     public static class InnerMapper
     81             extends Mapper<LongWritable, BytesWritable, Text, LongWritable>
     82             implements HstoreConstants {
     83 
     84         private final static LongWritable one = new LongWritable(1);
     85         private Text word = new Text();
     86 
     87         Pattern pattern = Pattern
     88                 .compile("https?:\/\/[-A-Za-z0-9+&#%?=~_|!:,.;]+\/[A-Za-z0-9+&#%=~_]{4,}");
     89 
     90         protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
     91             Status status = null;
     92             try {
     93                 status = AvroUtils.convertStatusAvroFromBytes(value.getBytes());   //
     94 
     95 
     96             } catch (Exception e) {
     97                 e.printStackTrace();
     98             }
     99             if (status == null) {
    100                 return;
    101             }
    102             if (status.getContent() == null) {
    103                 return;
    104             }
    105             String sb = status.getContent().toString();
    106             Matcher matcher = pattern.matcher(sb);
    107             while (matcher.find()) {
    108                 word.set(matcher.group(0));
    109                 context.write(word, one);
    110             }
    111         }
    112     }
    113 
    114     public static class InnerReducer extends
    115             Reducer<Text, LongWritable ,LongWritable,Text> implements HstoreConstants {
    116 
    117         @Override
    118         protected void reduce(Text key, Iterable<LongWritable> values,
    119                               Context context) throws IOException, InterruptedException {
    120             Long sum = 0L;
    121             for (LongWritable value : values) {
    122                 sum += value.get();
    123             }
    124             if (key.toString().length() == 0) {
    125                 return;
    126             }
    127             context.write( new LongWritable(sum),key);
    128         }
    129         //TimeFramePathFilter
    130     }
    131 }
    View Code

    hotlinkSort

      1 package com.eefung.hstore.mr.job.hotLink;
      2 import java.io.IOException;
      3 import java.util.Iterator;
      4 
      5 import com.eefung.hstore.help.HstoreConstants;
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.*;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     17 import org.apache.hadoop.util.Tool;
     18 import org.apache.hadoop.util.ToolRunner;
     19 import org.slf4j.Logger;
     20 import org.slf4j.LoggerFactory;
     21 
     22 public class HotLinkSort extends Configured implements Tool {
     23 
     24     private static final Logger logger = LoggerFactory
     25             .getLogger(HotLinkSort.class);
     26 
     27 
     28 
     29     public static void main(String[] args) throws Exception {
     30         System.exit(ToolRunner.run(new Configuration(), new HotLinkSort(), args));
     31     }
     32 
     33     @Override
     34     public int run(String[] args) throws Exception {
     35         if (args.length != 3) {
     36             logger.warn("wrong args!");
     37             return 1;
     38         }
     39         String input = args[0].trim();
     40         String output = args[1].trim();
     41         int reducerNum = Integer.parseInt(args[2].trim());
     42         Job job = Job.getInstance(getConf());
     43         job.setJobName("HotLinkSort");
     44         job.setJarByClass(HotLinkSort.class);
     45 
     46         job.setMapperClass(InnerMapper.class);
     47         job.setReducerClass(InnerReducer.class);
     48         job.setSortComparatorClass(InnerComparator.class);
     49 
     50         job.setInputFormatClass(KeyValueTextInputFormat.class);
     51         job.setOutputFormatClass(TextOutputFormat.class);
     52 
     53         job.setMapOutputKeyClass(LongWritable.class);
     54         job.setMapOutputValueClass(Text.class);
     55         job.setOutputKeyClass(Text.class);
     56         job.setOutputValueClass(LongWritable.class);
     57 
     58         job.setNumReduceTasks(reducerNum);
     59         String[] path = input.split(",");
     60         for (int i = 0; i < path.length; i++) {
     61             FileInputFormat.addInputPath(job, new Path(path[i]));
     62         }
     63         FileOutputFormat.setOutputPath(job, new Path(output));
     64 
     65         return job.waitForCompletion(true) ? 0 : 1;
     66     }
     67 
     68     public static class InnerMapper
     69             extends Mapper<Text,Text, LongWritable,Text>
     70             implements HstoreConstants {
     71 
     72 
     73         LongWritable key_out;
     74         long key_long;
     75         String key_str;
     76 
     77          protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, LongWritable,Text>.Context context) throws java.io.IOException, InterruptedException {
     78 
     79              try {
     80                  key_str=key.toString();
     81                  logger.info("@@@@@@@@@@@@@@ value 是:"+key_str);
     82                  key_long=Long.parseLong(key_str.trim());
     83                  key_out=new LongWritable(key_long);
     84 
     85                  context.write(key_out, value);
     86              } catch (NumberFormatException e) {
     87                  e.printStackTrace();
     88              } catch (IOException e) {
     89                  e.printStackTrace();
     90              } catch (InterruptedException e) {
     91                  e.printStackTrace();
     92              }
     93 
     94          }
     95         }
     96 
     97     public static class InnerReducer
     98             extends Reducer<LongWritable,Text,Text,LongWritable> implements
     99             HstoreConstants {
    100         @Override
    101         protected void setup(Context context) throws IOException, InterruptedException {
    102             super.setup(context);
    103            // logger.info("@@@@@@@@@@@@@@@@@@@@@@@@@");
    104         }
    105 
    106         public void reduce(LongWritable key, Iterable<Text> values,
    107                            Context context
    108         ) throws IOException, InterruptedException {
    109 
    110             Iterator ite = values.iterator();
    111             while(ite.hasNext())
    112             {
    113                 String record = ite.next().toString();
    114                 Text temp=new Text();
    115                 temp.set(record);
    116                 context.write(temp, key);
    117             }
    118 
    119         }
    120     }
    121 
    122     public static class InnerComparator extends WritableComparator {
    123 
    124         protected InnerComparator(){
    125             super(LongWritable.class, true);
    126         }
    127 
    128         public int compare(WritableComparable a, WritableComparable b)
    129         {
    130             LongWritable aa=(LongWritable)a;
    131             LongWritable bb=(LongWritable)b;
    132             return -aa.compareTo(bb);
    133         }
    134     }
    135 }
    View Code

    本来还有个短链转长链的过程:

      1 package com.eefung.hstore.mr.job.hotLink;
      2 import java.io.BufferedReader;
      3 import java.io.FileReader;
      4 import java.io.IOException;
      5 import java.text.SimpleDateFormat;
      6 import java.util.Date;
      7 import java.util.Iterator;
      8 import org.apache.http.HttpHost;
      9 import org.apache.http.HttpResponse;
     10 import org.apache.http.client.HttpClient;
     11 import org.apache.http.client.methods.HttpGet;
     12 import org.apache.http.client.methods.HttpUriRequest;
     13 import org.apache.http.impl.client.DefaultHttpClient;
     14 import org.apache.http.params.HttpConnectionParams;
     15 import org.apache.http.protocol.BasicHttpContext;
     16 import org.apache.http.protocol.ExecutionContext;
     17 import org.apache.http.protocol.HttpContext;
     18 import org.apache.http.util.EntityUtils;
     19 import com.eefung.hstore.help.HstoreConstants;
     20 import org.apache.hadoop.conf.Configuration;
     21 import org.apache.hadoop.conf.Configured;
     22 import org.apache.hadoop.fs.Path;
     23 import org.apache.hadoop.io.*;
     24 import org.apache.hadoop.mapreduce.Job;
     25 import org.apache.hadoop.mapreduce.Mapper;
     26 import org.apache.hadoop.mapreduce.Reducer;
     27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     28 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     30 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     31 import org.apache.hadoop.util.Tool;
     32 import org.apache.hadoop.util.ToolRunner;
     33 import org.slf4j.Logger;
     34 import org.slf4j.LoggerFactory;
     35 
     36 public class HotLinkRedirector extends Configured implements Tool {
     37 
     38     private static Redirector redirector = new Redirector();
     39 
     40     private static final Logger logger = LoggerFactory
     41             .getLogger(HotLinkRedirector.class);
     42 
     43 
     44 
     45     public static void main(String[] args) throws Exception {
     46         System.exit(ToolRunner.run(new Configuration(), new HotLinkRedirector(), args));
     47 
     48         /*redirector = new Redirector();
     49         BufferedReader reader = new BufferedReader(new FileReader("E:\temp.txt"));
     50         String shortLine = null;
     51 
     52         SimpleDateFormat df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
     53         df1.format(new Date());// new Date()为获取当前系统时间
     54 
     55 
     56         while (true) {
     57             shortLine = reader.readLine();
     58             if (shortLine == null)
     59                 break;
     60 
     61             String url = redirector.getRedirectInfo(shortLine.trim());
     62            // System.out.println("短链是:   " + shortLine + "   长链是:" + url);
     63 
     64         }
     65         SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
     66         df2.format(new Date());
     67         System.out.println("开始 "+df1);
     68         System.out.println("结束 "+df2);*/
     69 
     70     }
     71 
     72     @Override
     73     public int run(String[] args) throws Exception {
     74         if (args.length != 3) {
     75             logger.warn("wrong args!");
     76             return 1;
     77         }
     78         int reducerNum = Integer.parseInt(args[2].trim());
     79         Job job = Job.getInstance(getConf());
     80         job.setJobName("HotLink Redirector");
     81 
     82         job.setJarByClass(HotLinkRedirector.class);
     83         job.setMapperClass(InnerMapper.class);
     84 
     85         job.setInputFormatClass(KeyValueTextInputFormat.class);
     86         job.setOutputFormatClass(TextOutputFormat.class);
     87 
     88         job.setMapOutputKeyClass(Text.class);
     89         job.setMapOutputValueClass(LongWritable.class);
     90         job.setOutputKeyClass(Text.class);
     91         job.setOutputValueClass(LongWritable.class);
     92 
     93         job.setNumReduceTasks(reducerNum);
     94 
     95         FileInputFormat.addInputPath(job, new Path(args[0]));
     96         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     97 
     98         return job.waitForCompletion(true) ? 0 : 1;
     99     }
    100 
    101     public static class InnerMapper
    102             extends Mapper<Text,Text,Text,LongWritable>
    103             implements HstoreConstants {
    104         Text textOut =  new Text();
    105         LongWritable valueOut;
    106 
    107 
    108         @Override
    109         protected void cleanup(Context context) throws IOException, InterruptedException {
    110             redirector.release();
    111 
    112         }
    113 
    114         protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {
    115             String shortLink=key.toString();
    116             String longLink=redirector.getRedirectInfo(shortLink);
    117             if(longLink==null)
    118                  longLink=shortLink;
    119             textOut.set(longLink);
    120 
    121             String temp=value.toString();
    122             long a=Long.parseLong(temp);
    123             valueOut=new LongWritable(a);
    124             context.write(textOut,valueOut);
    125         }
    126     }
    127 
    128 
    129 
    130     private static class Redirector {
    131         private HttpClient httpClient; // 发送HTTP请求和从由URI识别的资源接收HTTP响应。
    132 
    133         public Redirector() {
    134             httpClient = new DefaultHttpClient();
    135             httpClient.getParams().setParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 1000);
    136             httpClient.getParams().setParameter(HttpConnectionParams.SO_TIMEOUT, 1000);
    137 
    138         }
    139 
    140         public void release() {
    141             httpClient.getConnectionManager().shutdown();
    142         }
    143 
    144         public String getRedirectInfo(String shortLink) {
    145             HttpResponse response=null;
    146             try {
    147                 HttpGet httpGet = new HttpGet(shortLink); // 用于发送 HTTP GET 请求及接收 HTTP
    148                 HttpContext httpContext  = new BasicHttpContext(); // 封装了一个独立的HTTP请求的特定的信息。
    149                 response = httpClient.execute(httpGet, httpContext);
    150              
    151 
    152                 HttpHost targetHost = (HttpHost) httpContext
    153                         .getAttribute(ExecutionContext.HTTP_TARGET_HOST);
    154                 
    155 
    156                 HttpUriRequest realRequest = (HttpUriRequest) httpContext
    157                         .getAttribute(ExecutionContext.HTTP_REQUEST);
    158 
    159                 return targetHost.toString()
    160                         + realRequest.getURI().toString();
    161 
    162             } catch (Throwable e) {
    163                 return null;
    164             }finally {
    165                 try {
    166                     if (response != null) {
    167                         EntityUtils.consume(response.getEntity());
    168                     }
    169                 } catch (IOException e) {
    170                     e.printStackTrace();
    171                 }
    172             }
    173         }
    174     }
    175 }
    View Code

    但是这个转换的过程太慢了,数据量大的时候放在哪个阶段在哪个阶段就kill掉了。

    oozie 调度:

    学习的时候主要是看的 http://shiyanjun.cn/archives/tag/oozie 这个博客

    描述:分别跑sina,twitter,sohu,tencent,netease几个平台是数据,(先执行hotlinkCount 其结果用于hotlinkSort的输入)。 

    编写代码:在工程里的workflow目录下建hotlink目录,其下编写以下几个文件:

    1. coordinator.xml:

    1 <?xml version="1.0" encoding="UTF-8"?>
    2 <coordinator-app name="hotlink" frequency="${coord:days(1)}"
    3                  start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
    4     <action>
    5         <workflow>
    6             <app-path>${root}/workflow.xml</app-path>
    7         </workflow>
    8     </action>
    9 </coordinator-app>
    View Code
     

    2. job.properties:

     1 nn=hdfs://hdt01:8020
     2 rm=hdt01:8050
     3 
     4 root=/eefung/yangzf/oozie/jobs/hotlink
     5 start=2015-01-21T13:00Z
     6 end=2020-01-01T00:00Z
     7 
     8 dest=/hotlink
     9 
    10 #oozie.wf.application.path=${nn}${root}
    11 
    12 oozie.coord.application.path=${nn}${root}
    13 oozie.libpath=${nn}${root}/lib
    14 
    15 
    16 cbaseColumnFamily=content
    17 
    18 beginDays=29
    19 endDays=26
    View Code

    3. workflow.xml:

     1 <?xml version="1.0" encoding="utf-8"?>
     2 <workflow-app xmlns='uri:oozie:workflow:0.4' name='hotlink'>
     3 
     4     <start to="sina"/>
     5 
     6     <action name="sina">
     7         <sub-workflow>
     8             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
     9             <propagate-configuration/>
    10             <configuration>
    11                 <property>
    12                     <name>platform</name>
    13                     <value>sina</value>
    14                 </property>
    15 
    16             </configuration>
    17         </sub-workflow>
    18         <ok to="twitter"/>
    19         <error to="twitter"/>
    20     </action>
    21 
    22     <action name="twitter">
    23         <sub-workflow>
    24             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    25             <propagate-configuration/>
    26             <configuration>
    27                 <property>
    28                     <name>platform</name>
    29                     <value>twitter</value>
    30                 </property>
    31 
    32             </configuration>
    33         </sub-workflow>
    34         <ok to="tencent"/>
    35         <error to="tencent"/>
    36     </action>
    37 
    38     <action name="tencent">
    39         <sub-workflow>
    40             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    41             <propagate-configuration/>
    42             <configuration>
    43                 <property>
    44                     <name>platform</name>
    45                     <value>tencent</value>
    46                 </property>
    47 
    48             </configuration>
    49         </sub-workflow>
    50         <ok to="netease"/>
    51         <error to="netease"/>
    52     </action>
    53 
    54     <action name="netease">
    55         <sub-workflow>
    56             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    57             <propagate-configuration/>
    58             <configuration>
    59                 <property>
    60                     <name>platform</name>
    61                     <value>netease</value>
    62                 </property>
    63 
    64             </configuration>
    65         </sub-workflow>
    66         <ok to="sohu"/>
    67         <error to="sohu"/>
    68     </action>
    69 
    70     <action name="sohu">
    71         <sub-workflow>
    72             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    73             <propagate-configuration/>
    74             <configuration>
    75                 <property>
    76                     <name>platform</name>
    77                     <value>sohu</value>
    78                 </property>
    79 
    80             </configuration>
    81         </sub-workflow>
    82         <ok to="end"/>
    83         <error to="fail"/>
    84     </action>
    85 
    86     <kill name="fail">
    87         <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    88     </kill>
    89 
    90     <end name="end"/>
    91 
    92 </workflow-app>
    View Code

    4. workflow-hotlink-mr:

      1 <?xml version="1.0" encoding="utf-8"?>
      2 <workflow-app xmlns='uri:oozie:workflow:0.4' name='hotlink'>
      3 
      4     <start to="count" />
      5 
      6     <action name="count">
      7         <map-reduce>
      8             <job-tracker>${rm}</job-tracker>
      9             <name-node>${nn}</name-node>
     10             <prepare>
     11                 <delete path="${nn}${root}/output/${platform}/count"/>
     12             </prepare>
     13             <configuration>
     14                 <property>
     15                     <name>mapred.mapper.new-api</name>
     16                     <value>true</value>
     17                 </property>
     18                 <property>
     19                     <name>mapred.reducer.new-api</name>
     20                     <value>true</value>
     21                 </property>
     22                 <property>
     23                     <name>mapreduce.map.class</name>
     24                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkCount$InnerMapper</value>
     25                 </property>
     26                 <property>
     27                     <name>mapreduce.reduce.class</name>
     28                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkCount$InnerReducer</value>
     29                 </property>
     30                 <property>
     31                     <name>mapreduce.inputformat.class</name>
     32                     <value>org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat</value>
     33                 </property>
     34                 <property>
     35                     <name>mapreduce.outputformat.class</name>
     36                     <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
     37                 </property>
     38                 <property>
     39                     <name>mapred.mapoutput.key.class</name>
     40                     <value>org.apache.hadoop.io.Text</value>
     41                 </property>
     42                 <property>
     43                     <name>mapred.mapoutput.value.class</name>
     44                     <value>org.apache.hadoop.io.LongWritable</value>
     45                 </property>
     46                 <property>
     47                     <name>mapred.output.key.class</name>
     48                     <value>org.apache.hadoop.io.LongWritable</value>
     49                 </property>
     50                 <property>
     51                     <name>mapred.output.value.class</name>
     52                     <value>org.apache.hadoop.io.Text</value>
     53                 </property>
     54                 <property>
     55                     <name>mapreduce.job.reduces</name>
     56                     <value>12</value>
     57                 </property>
     58 
     59                 <property>
     60                     <name>mapred.input.pathFilter.class</name>
     61                     <value>com.eefung.hstore.mr.input.TimeFramePathFilter</value>
     62                 </property>
     63 
     64                 <property>
     65                     <name>source.time.frame.begin</name>
     66                     <value>${beginDays}</value>
     67                 </property>
     68                 <property>
     69                     <name>source.time.frame.end</name>
     70                     <value>${endDays}</value>
     71                 </property>
     72 
     73                 <property>
     74                     <name>mapred.input.dir</name>
     75                     <value>${root}/input/${platform}/*</value>
     76                 </property>
     77 
     78                 <property>
     79                     <name>mapred.output.dir</name>
     80                     <value>${root}/output/${platform}/count</value>
     81                 </property>
     82             </configuration>
     83         </map-reduce>
     84         <ok to="sort"/>
     85         <error to="fail"/>
     86     </action>
     87 
     88     <action name="sort">
     89         <map-reduce>
     90             <job-tracker>${rm}</job-tracker>
     91             <name-node>${nn}</name-node>
     92             <prepare>
     93                 <delete path="${nn}${root}/output/${platform}/sort"/>
     94             </prepare>
     95             <configuration>
     96                 <property>
     97                     <name>mapred.mapper.new-api</name>
     98                     <value>true</value>
     99                 </property>
    100                 <property>
    101                     <name>mapred.reducer.new-api</name>
    102                     <value>true</value>
    103                 </property>
    104 
    105                 <property>
    106                     <name>mapred.input.dir</name>
    107                     <value>${root}/output/${platform}/count/*</value>
    108                 </property>
    109                 <property>
    110                     <name>mapred.output.dir</name>
    111                     <value>${root}/output/${platform}/sort</value>
    112                 </property>
    113 
    114                 <property>
    115                     <name>mapreduce.map.class</name>
    116                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerMapper</value>
    117                 </property>
    118                 <property>
    119                     <name>mapreduce.reduce.class</name>
    120                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerReducer</value>
    121                 </property>
    122 
    123                 <property>
    124                     <name>mapreduce.compare.class</name>
    125                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerComparator</value>
    126                 </property>
    127                 <property>
    128                     <name>mapreduce.inputformat.class</name>
    129                     <value>org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat</value>
    130                 </property>
    131                 <property>
    132                     <name>mapreduce.outputformat.class</name>
    133                     <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
    134                 </property>
    135                 <property>
    136                     <name>mapred.mapoutput.key.class</name>
    137                     <value>org.apache.hadoop.io.LongWritable</value>
    138                 </property>
    139                 <property>
    140                     <name>mapred.mapoutput.value.class</name>
    141                     <value>org.apache.hadoop.io.Text</value>
    142                 </property>
    143                 <property>
    144                     <name>mapred.output.key.class</name>
    145                     <value>org.apache.hadoop.io.Text</value>
    146                 </property>
    147                 <property>
    148                     <name>mapred.output.value.class</name>
    149                     <value>org.apache.hadoop.io.LongWritable</value>
    150                 </property>
    151                 <property>
    152                     <name>mapreduce.job.reduces</name>
    153                     <value>1</value>
    154                 </property>
    155 
    156 
    157             </configuration>
    158 
    159         </map-reduce>
    160         <ok to="end"/>
    161         <error to="fail"/>
    162     </action>
    163 
    164 
    165 
    166     <kill name="fail">
    167         <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    168     </kill>
    169     <end name="end"/>
    170 </workflow-app>
    View Code 

    注意:

    1.在mapreduce中所有的参数都要一一对应地写到配置文件中 ,oozie并不会走main程序。

    运行:

    1.workflow-hotlink-mr,workflow.xml,coordinator.xml  放到集群中(我的目录是/eefung/yangzf/oozie/jobs/hotlink),job.properties放到本地(目录是/eefung/yangzf/oozie/jobs/hotlink)(虽然路径名相同但一个在hdfs上,一个在代理服务器上)。

    2.输入命令: oozie job -oozie http://hdt02:11000/oozie -config  /home/oozie/eefung/yangzf/oozie/jobs/hotlink/job.properties -run 。

      若成功会返回job id,可以在 http://hdt02:11000/oozie/ 页面跟踪它的运行 。

  • 相关阅读:
    第十一周编程总结
    第十周编程总结
    第九周
    第八周
    第七周编程总结
    第六周编程总结
    学期总结
    第十四周课程总结&实验报告(简单记事本的实现)
    第十三周课程总结
    第十二周课程总结
  • 原文地址:https://www.cnblogs.com/assult/p/4275201.html
Copyright © 2011-2022 走看看