zoukankan      html  css  js  c++  java
  • mapreduce 程序实例:hotlink统计程序及其oozie 调度

    mapreduce程序 :

     比较好的博客:

    1.  http://blog.csdn.net/posa88?viewmode=contents  hadoop源码解析系列

    2.  http://www.cnblogs.com/xia520pi/  hadoop集群系列 (特别是其中的 http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html  初级案例)

    描述: 提取出二进制文件里的短链,并统计排序(由大到小排序)

    思路: 在hotlinkCount中提取短链,并统计 (key:短链    value:出现次数)。 在hotlinkSort中读取上个程序的结果进行排序。

    HotlinkCount:

      1 /*
      2  * Copyright (c) EEFUNG 2014 All Rights Reserved
      3  *
      4  * 统计每个连接出现的频率
      5  *
      6  */
      7 package com.eefung.hstore.mr.job.hotLink;
      8 
      9 import cn.antvision.eagleattack.model.Status;
     10 import com.eefung.hstore.help.HstoreConstants;
     11 import com.eefung.hstore.help.AvroUtils;
     12 import com.eefung.hstore.mr.input.TimeFramePathFilter;
     13 import org.apache.hadoop.conf.Configuration;
     14 import org.apache.hadoop.conf.Configured;
     15 import org.apache.hadoop.fs.Path;
     16 import org.apache.hadoop.io.BytesWritable;
     17 import org.apache.hadoop.io.IntWritable;
     18 import org.apache.hadoop.io.LongWritable;
     19 import org.apache.hadoop.io.Text;
     20 import org.apache.hadoop.mapreduce.Job;
     21 import org.apache.hadoop.mapreduce.Mapper;
     22 import org.apache.hadoop.mapreduce.Reducer;
     23 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     24 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     25 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     26 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     27 import org.apache.hadoop.util.Tool;
     28 import org.apache.hadoop.util.ToolRunner;
     29 import org.slf4j.Logger;
     30 import org.slf4j.LoggerFactory;
     31 import java.io.IOException;
     32 import java.util.regex.Matcher;
     33 import java.util.regex.Pattern;
     34 
     35 public class HotLinkCount extends Configured implements Tool {
     36 
     37     private static final Logger logger = LoggerFactory
     38             .getLogger(HotLinkCount.class);
     39 
     40     public static void main(String[] args) throws Exception {
     41         System.exit(ToolRunner.run(new Configuration(), new HotLinkCount(), args));
     42     }
     43 
     44     @Override
     45     public int run(String[] args) throws Exception {
     46         if (args.length != 3) {
     47             logger.warn("wrong args!");
     48             return 1;
     49         }
     50         String input = args[0].trim();
     51         String output = args[1].trim();
     52         int reducerNum = Integer.parseInt(args[2].trim());
     53         Job job = Job.getInstance(getConf());
     54         job.setJobName("HotLink Count");
     55         job.setJarByClass(HotLinkCount.class);
     56         job.setMapperClass(InnerMapper.class);
     57         job.setReducerClass(InnerReducer.class);
     58 
     59         job.setInputFormatClass(SequenceFileInputFormat.class);
     60         job.setOutputFormatClass(TextOutputFormat.class);
     61 
     62         job.setMapOutputKeyClass(Text.class);
     63         job.setMapOutputValueClass(LongWritable.class);
     64         job.setOutputKeyClass(LongWritable.class);
     65         job.setOutputValueClass(Text.class);
     66 
     67         job.setNumReduceTasks(reducerNum);
     68         FileInputFormat.setInputPathFilter(job, TimeFramePathFilter.class);///
     69 
     70 
     71         String[] sources = input.split(HstoreConstants.DELIMITER_COMMA);
     72         for (String source : sources) {
     73             FileInputFormat.addInputPath(job, new Path(source));
     74         }
     75 
     76         FileOutputFormat.setOutputPath(job, new Path(output));
     77         return job.waitForCompletion(true) ? 0 : 1;
     78     }
     79 
     80     public static class InnerMapper
     81             extends Mapper<LongWritable, BytesWritable, Text, LongWritable>
     82             implements HstoreConstants {
     83 
     84         private final static LongWritable one = new LongWritable(1);
     85         private Text word = new Text();
     86 
     87         Pattern pattern = Pattern
     88                 .compile("https?:\/\/[-A-Za-z0-9+&#%?=~_|!:,.;]+\/[A-Za-z0-9+&#%=~_]{4,}");
     89 
     90         protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
     91             Status status = null;
     92             try {
     93                 status = AvroUtils.convertStatusAvroFromBytes(value.getBytes());   //
     94 
     95 
     96             } catch (Exception e) {
     97                 e.printStackTrace();
     98             }
     99             if (status == null) {
    100                 return;
    101             }
    102             if (status.getContent() == null) {
    103                 return;
    104             }
    105             String sb = status.getContent().toString();
    106             Matcher matcher = pattern.matcher(sb);
    107             while (matcher.find()) {
    108                 word.set(matcher.group(0));
    109                 context.write(word, one);
    110             }
    111         }
    112     }
    113 
    114     public static class InnerReducer extends
    115             Reducer<Text, LongWritable ,LongWritable,Text> implements HstoreConstants {
    116 
    117         @Override
    118         protected void reduce(Text key, Iterable<LongWritable> values,
    119                               Context context) throws IOException, InterruptedException {
    120             Long sum = 0L;
    121             for (LongWritable value : values) {
    122                 sum += value.get();
    123             }
    124             if (key.toString().length() == 0) {
    125                 return;
    126             }
    127             context.write( new LongWritable(sum),key);
    128         }
    129         //TimeFramePathFilter
    130     }
    131 }
    View Code

    hotlinkSort

      1 package com.eefung.hstore.mr.job.hotLink;
      2 import java.io.IOException;
      3 import java.util.Iterator;
      4 
      5 import com.eefung.hstore.help.HstoreConstants;
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.*;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     17 import org.apache.hadoop.util.Tool;
     18 import org.apache.hadoop.util.ToolRunner;
     19 import org.slf4j.Logger;
     20 import org.slf4j.LoggerFactory;
     21 
     22 public class HotLinkSort extends Configured implements Tool {
     23 
     24     private static final Logger logger = LoggerFactory
     25             .getLogger(HotLinkSort.class);
     26 
     27 
     28 
     29     public static void main(String[] args) throws Exception {
     30         System.exit(ToolRunner.run(new Configuration(), new HotLinkSort(), args));
     31     }
     32 
     33     @Override
     34     public int run(String[] args) throws Exception {
     35         if (args.length != 3) {
     36             logger.warn("wrong args!");
     37             return 1;
     38         }
     39         String input = args[0].trim();
     40         String output = args[1].trim();
     41         int reducerNum = Integer.parseInt(args[2].trim());
     42         Job job = Job.getInstance(getConf());
     43         job.setJobName("HotLinkSort");
     44         job.setJarByClass(HotLinkSort.class);
     45 
     46         job.setMapperClass(InnerMapper.class);
     47         job.setReducerClass(InnerReducer.class);
     48         job.setSortComparatorClass(InnerComparator.class);
     49 
     50         job.setInputFormatClass(KeyValueTextInputFormat.class);
     51         job.setOutputFormatClass(TextOutputFormat.class);
     52 
     53         job.setMapOutputKeyClass(LongWritable.class);
     54         job.setMapOutputValueClass(Text.class);
     55         job.setOutputKeyClass(Text.class);
     56         job.setOutputValueClass(LongWritable.class);
     57 
     58         job.setNumReduceTasks(reducerNum);
     59         String[] path = input.split(",");
     60         for (int i = 0; i < path.length; i++) {
     61             FileInputFormat.addInputPath(job, new Path(path[i]));
     62         }
     63         FileOutputFormat.setOutputPath(job, new Path(output));
     64 
     65         return job.waitForCompletion(true) ? 0 : 1;
     66     }
     67 
     68     public static class InnerMapper
     69             extends Mapper<Text,Text, LongWritable,Text>
     70             implements HstoreConstants {
     71 
     72 
     73         LongWritable key_out;
     74         long key_long;
     75         String key_str;
     76 
     77          protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, LongWritable,Text>.Context context) throws java.io.IOException, InterruptedException {
     78 
     79              try {
     80                  key_str=key.toString();
     81                  logger.info("@@@@@@@@@@@@@@ value 是:"+key_str);
     82                  key_long=Long.parseLong(key_str.trim());
     83                  key_out=new LongWritable(key_long);
     84 
     85                  context.write(key_out, value);
     86              } catch (NumberFormatException e) {
     87                  e.printStackTrace();
     88              } catch (IOException e) {
     89                  e.printStackTrace();
     90              } catch (InterruptedException e) {
     91                  e.printStackTrace();
     92              }
     93 
     94          }
     95         }
     96 
     97     public static class InnerReducer
     98             extends Reducer<LongWritable,Text,Text,LongWritable> implements
     99             HstoreConstants {
    100         @Override
    101         protected void setup(Context context) throws IOException, InterruptedException {
    102             super.setup(context);
    103            // logger.info("@@@@@@@@@@@@@@@@@@@@@@@@@");
    104         }
    105 
    106         public void reduce(LongWritable key, Iterable<Text> values,
    107                            Context context
    108         ) throws IOException, InterruptedException {
    109 
    110             Iterator ite = values.iterator();
    111             while(ite.hasNext())
    112             {
    113                 String record = ite.next().toString();
    114                 Text temp=new Text();
    115                 temp.set(record);
    116                 context.write(temp, key);
    117             }
    118 
    119         }
    120     }
    121 
    122     public static class InnerComparator extends WritableComparator {
    123 
    124         protected InnerComparator(){
    125             super(LongWritable.class, true);
    126         }
    127 
    128         public int compare(WritableComparable a, WritableComparable b)
    129         {
    130             LongWritable aa=(LongWritable)a;
    131             LongWritable bb=(LongWritable)b;
    132             return -aa.compareTo(bb);
    133         }
    134     }
    135 }
    View Code

    本来还有个短链转长链的过程:

      1 package com.eefung.hstore.mr.job.hotLink;
      2 import java.io.BufferedReader;
      3 import java.io.FileReader;
      4 import java.io.IOException;
      5 import java.text.SimpleDateFormat;
      6 import java.util.Date;
      7 import java.util.Iterator;
      8 import org.apache.http.HttpHost;
      9 import org.apache.http.HttpResponse;
     10 import org.apache.http.client.HttpClient;
     11 import org.apache.http.client.methods.HttpGet;
     12 import org.apache.http.client.methods.HttpUriRequest;
     13 import org.apache.http.impl.client.DefaultHttpClient;
     14 import org.apache.http.params.HttpConnectionParams;
     15 import org.apache.http.protocol.BasicHttpContext;
     16 import org.apache.http.protocol.ExecutionContext;
     17 import org.apache.http.protocol.HttpContext;
     18 import org.apache.http.util.EntityUtils;
     19 import com.eefung.hstore.help.HstoreConstants;
     20 import org.apache.hadoop.conf.Configuration;
     21 import org.apache.hadoop.conf.Configured;
     22 import org.apache.hadoop.fs.Path;
     23 import org.apache.hadoop.io.*;
     24 import org.apache.hadoop.mapreduce.Job;
     25 import org.apache.hadoop.mapreduce.Mapper;
     26 import org.apache.hadoop.mapreduce.Reducer;
     27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     28 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     30 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     31 import org.apache.hadoop.util.Tool;
     32 import org.apache.hadoop.util.ToolRunner;
     33 import org.slf4j.Logger;
     34 import org.slf4j.LoggerFactory;
     35 
     36 public class HotLinkRedirector extends Configured implements Tool {
     37 
     38     private static Redirector redirector = new Redirector();
     39 
     40     private static final Logger logger = LoggerFactory
     41             .getLogger(HotLinkRedirector.class);
     42 
     43 
     44 
     45     public static void main(String[] args) throws Exception {
     46         System.exit(ToolRunner.run(new Configuration(), new HotLinkRedirector(), args));
     47 
     48         /*redirector = new Redirector();
     49         BufferedReader reader = new BufferedReader(new FileReader("E:\temp.txt"));
     50         String shortLine = null;
     51 
     52         SimpleDateFormat df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
     53         df1.format(new Date());// new Date()为获取当前系统时间
     54 
     55 
     56         while (true) {
     57             shortLine = reader.readLine();
     58             if (shortLine == null)
     59                 break;
     60 
     61             String url = redirector.getRedirectInfo(shortLine.trim());
     62            // System.out.println("短链是:   " + shortLine + "   长链是:" + url);
     63 
     64         }
     65         SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
     66         df2.format(new Date());
     67         System.out.println("开始 "+df1);
     68         System.out.println("结束 "+df2);*/
     69 
     70     }
     71 
     72     @Override
     73     public int run(String[] args) throws Exception {
     74         if (args.length != 3) {
     75             logger.warn("wrong args!");
     76             return 1;
     77         }
     78         int reducerNum = Integer.parseInt(args[2].trim());
     79         Job job = Job.getInstance(getConf());
     80         job.setJobName("HotLink Redirector");
     81 
     82         job.setJarByClass(HotLinkRedirector.class);
     83         job.setMapperClass(InnerMapper.class);
     84 
     85         job.setInputFormatClass(KeyValueTextInputFormat.class);
     86         job.setOutputFormatClass(TextOutputFormat.class);
     87 
     88         job.setMapOutputKeyClass(Text.class);
     89         job.setMapOutputValueClass(LongWritable.class);
     90         job.setOutputKeyClass(Text.class);
     91         job.setOutputValueClass(LongWritable.class);
     92 
     93         job.setNumReduceTasks(reducerNum);
     94 
     95         FileInputFormat.addInputPath(job, new Path(args[0]));
     96         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     97 
     98         return job.waitForCompletion(true) ? 0 : 1;
     99     }
    100 
    101     public static class InnerMapper
    102             extends Mapper<Text,Text,Text,LongWritable>
    103             implements HstoreConstants {
    104         Text textOut =  new Text();
    105         LongWritable valueOut;
    106 
    107 
    108         @Override
    109         protected void cleanup(Context context) throws IOException, InterruptedException {
    110             redirector.release();
    111 
    112         }
    113 
    114         protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {
    115             String shortLink=key.toString();
    116             String longLink=redirector.getRedirectInfo(shortLink);
    117             if(longLink==null)
    118                  longLink=shortLink;
    119             textOut.set(longLink);
    120 
    121             String temp=value.toString();
    122             long a=Long.parseLong(temp);
    123             valueOut=new LongWritable(a);
    124             context.write(textOut,valueOut);
    125         }
    126     }
    127 
    128 
    129 
    130     private static class Redirector {
    131         private HttpClient httpClient; // 发送HTTP请求和从由URI识别的资源接收HTTP响应。
    132 
    133         public Redirector() {
    134             httpClient = new DefaultHttpClient();
    135             httpClient.getParams().setParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 1000);
    136             httpClient.getParams().setParameter(HttpConnectionParams.SO_TIMEOUT, 1000);
    137 
    138         }
    139 
    140         public void release() {
    141             httpClient.getConnectionManager().shutdown();
    142         }
    143 
    144         public String getRedirectInfo(String shortLink) {
    145             HttpResponse response=null;
    146             try {
    147                 HttpGet httpGet = new HttpGet(shortLink); // 用于发送 HTTP GET 请求及接收 HTTP
    148                 HttpContext httpContext  = new BasicHttpContext(); // 封装了一个独立的HTTP请求的特定的信息。
    149                 response = httpClient.execute(httpGet, httpContext);
    150              
    151 
    152                 HttpHost targetHost = (HttpHost) httpContext
    153                         .getAttribute(ExecutionContext.HTTP_TARGET_HOST);
    154                 
    155 
    156                 HttpUriRequest realRequest = (HttpUriRequest) httpContext
    157                         .getAttribute(ExecutionContext.HTTP_REQUEST);
    158 
    159                 return targetHost.toString()
    160                         + realRequest.getURI().toString();
    161 
    162             } catch (Throwable e) {
    163                 return null;
    164             }finally {
    165                 try {
    166                     if (response != null) {
    167                         EntityUtils.consume(response.getEntity());
    168                     }
    169                 } catch (IOException e) {
    170                     e.printStackTrace();
    171                 }
    172             }
    173         }
    174     }
    175 }
    View Code

    但是这个转换的过程太慢了,数据量大的时候放在哪个阶段在哪个阶段就kill掉了。

    oozie 调度:

    学习的时候主要是看的 http://shiyanjun.cn/archives/tag/oozie 这个博客

    描述:分别跑sina,twitter,sohu,tencent,netease几个平台是数据,(先执行hotlinkCount 其结果用于hotlinkSort的输入)。 

    编写代码:在工程里的workflow目录下建hotlink目录,其下编写以下几个文件:

    1. coordinator.xml:

    1 <?xml version="1.0" encoding="UTF-8"?>
    2 <coordinator-app name="hotlink" frequency="${coord:days(1)}"
    3                  start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
    4     <action>
    5         <workflow>
    6             <app-path>${root}/workflow.xml</app-path>
    7         </workflow>
    8     </action>
    9 </coordinator-app>
    View Code
     

    2. job.properties:

     1 nn=hdfs://hdt01:8020
     2 rm=hdt01:8050
     3 
     4 root=/eefung/yangzf/oozie/jobs/hotlink
     5 start=2015-01-21T13:00Z
     6 end=2020-01-01T00:00Z
     7 
     8 dest=/hotlink
     9 
    10 #oozie.wf.application.path=${nn}${root}
    11 
    12 oozie.coord.application.path=${nn}${root}
    13 oozie.libpath=${nn}${root}/lib
    14 
    15 
    16 cbaseColumnFamily=content
    17 
    18 beginDays=29
    19 endDays=26
    View Code

    3. workflow.xml:

     1 <?xml version="1.0" encoding="utf-8"?>
     2 <workflow-app xmlns='uri:oozie:workflow:0.4' name='hotlink'>
     3 
     4     <start to="sina"/>
     5 
     6     <action name="sina">
     7         <sub-workflow>
     8             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
     9             <propagate-configuration/>
    10             <configuration>
    11                 <property>
    12                     <name>platform</name>
    13                     <value>sina</value>
    14                 </property>
    15 
    16             </configuration>
    17         </sub-workflow>
    18         <ok to="twitter"/>
    19         <error to="twitter"/>
    20     </action>
    21 
    22     <action name="twitter">
    23         <sub-workflow>
    24             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    25             <propagate-configuration/>
    26             <configuration>
    27                 <property>
    28                     <name>platform</name>
    29                     <value>twitter</value>
    30                 </property>
    31 
    32             </configuration>
    33         </sub-workflow>
    34         <ok to="tencent"/>
    35         <error to="tencent"/>
    36     </action>
    37 
    38     <action name="tencent">
    39         <sub-workflow>
    40             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    41             <propagate-configuration/>
    42             <configuration>
    43                 <property>
    44                     <name>platform</name>
    45                     <value>tencent</value>
    46                 </property>
    47 
    48             </configuration>
    49         </sub-workflow>
    50         <ok to="netease"/>
    51         <error to="netease"/>
    52     </action>
    53 
    54     <action name="netease">
    55         <sub-workflow>
    56             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    57             <propagate-configuration/>
    58             <configuration>
    59                 <property>
    60                     <name>platform</name>
    61                     <value>netease</value>
    62                 </property>
    63 
    64             </configuration>
    65         </sub-workflow>
    66         <ok to="sohu"/>
    67         <error to="sohu"/>
    68     </action>
    69 
    70     <action name="sohu">
    71         <sub-workflow>
    72             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
    73             <propagate-configuration/>
    74             <configuration>
    75                 <property>
    76                     <name>platform</name>
    77                     <value>sohu</value>
    78                 </property>
    79 
    80             </configuration>
    81         </sub-workflow>
    82         <ok to="end"/>
    83         <error to="fail"/>
    84     </action>
    85 
    86     <kill name="fail">
    87         <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    88     </kill>
    89 
    90     <end name="end"/>
    91 
    92 </workflow-app>
    View Code

    4. workflow-hotlink-mr:

      1 <?xml version="1.0" encoding="utf-8"?>
      2 <workflow-app xmlns='uri:oozie:workflow:0.4' name='hotlink'>
      3 
      4     <start to="count" />
      5 
      6     <action name="count">
      7         <map-reduce>
      8             <job-tracker>${rm}</job-tracker>
      9             <name-node>${nn}</name-node>
     10             <prepare>
     11                 <delete path="${nn}${root}/output/${platform}/count"/>
     12             </prepare>
     13             <configuration>
     14                 <property>
     15                     <name>mapred.mapper.new-api</name>
     16                     <value>true</value>
     17                 </property>
     18                 <property>
     19                     <name>mapred.reducer.new-api</name>
     20                     <value>true</value>
     21                 </property>
     22                 <property>
     23                     <name>mapreduce.map.class</name>
     24                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkCount$InnerMapper</value>
     25                 </property>
     26                 <property>
     27                     <name>mapreduce.reduce.class</name>
     28                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkCount$InnerReducer</value>
     29                 </property>
     30                 <property>
     31                     <name>mapreduce.inputformat.class</name>
     32                     <value>org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat</value>
     33                 </property>
     34                 <property>
     35                     <name>mapreduce.outputformat.class</name>
     36                     <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
     37                 </property>
     38                 <property>
     39                     <name>mapred.mapoutput.key.class</name>
     40                     <value>org.apache.hadoop.io.Text</value>
     41                 </property>
     42                 <property>
     43                     <name>mapred.mapoutput.value.class</name>
     44                     <value>org.apache.hadoop.io.LongWritable</value>
     45                 </property>
     46                 <property>
     47                     <name>mapred.output.key.class</name>
     48                     <value>org.apache.hadoop.io.LongWritable</value>
     49                 </property>
     50                 <property>
     51                     <name>mapred.output.value.class</name>
     52                     <value>org.apache.hadoop.io.Text</value>
     53                 </property>
     54                 <property>
     55                     <name>mapreduce.job.reduces</name>
     56                     <value>12</value>
     57                 </property>
     58 
     59                 <property>
     60                     <name>mapred.input.pathFilter.class</name>
     61                     <value>com.eefung.hstore.mr.input.TimeFramePathFilter</value>
     62                 </property>
     63 
     64                 <property>
     65                     <name>source.time.frame.begin</name>
     66                     <value>${beginDays}</value>
     67                 </property>
     68                 <property>
     69                     <name>source.time.frame.end</name>
     70                     <value>${endDays}</value>
     71                 </property>
     72 
     73                 <property>
     74                     <name>mapred.input.dir</name>
     75                     <value>${root}/input/${platform}/*</value>
     76                 </property>
     77 
     78                 <property>
     79                     <name>mapred.output.dir</name>
     80                     <value>${root}/output/${platform}/count</value>
     81                 </property>
     82             </configuration>
     83         </map-reduce>
     84         <ok to="sort"/>
     85         <error to="fail"/>
     86     </action>
     87 
     88     <action name="sort">
     89         <map-reduce>
     90             <job-tracker>${rm}</job-tracker>
     91             <name-node>${nn}</name-node>
     92             <prepare>
     93                 <delete path="${nn}${root}/output/${platform}/sort"/>
     94             </prepare>
     95             <configuration>
     96                 <property>
     97                     <name>mapred.mapper.new-api</name>
     98                     <value>true</value>
     99                 </property>
    100                 <property>
    101                     <name>mapred.reducer.new-api</name>
    102                     <value>true</value>
    103                 </property>
    104 
    105                 <property>
    106                     <name>mapred.input.dir</name>
    107                     <value>${root}/output/${platform}/count/*</value>
    108                 </property>
    109                 <property>
    110                     <name>mapred.output.dir</name>
    111                     <value>${root}/output/${platform}/sort</value>
    112                 </property>
    113 
    114                 <property>
    115                     <name>mapreduce.map.class</name>
    116                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerMapper</value>
    117                 </property>
    118                 <property>
    119                     <name>mapreduce.reduce.class</name>
    120                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerReducer</value>
    121                 </property>
    122 
    123                 <property>
    124                     <name>mapreduce.compare.class</name>
    125                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerComparator</value>
    126                 </property>
    127                 <property>
    128                     <name>mapreduce.inputformat.class</name>
    129                     <value>org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat</value>
    130                 </property>
    131                 <property>
    132                     <name>mapreduce.outputformat.class</name>
    133                     <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
    134                 </property>
    135                 <property>
    136                     <name>mapred.mapoutput.key.class</name>
    137                     <value>org.apache.hadoop.io.LongWritable</value>
    138                 </property>
    139                 <property>
    140                     <name>mapred.mapoutput.value.class</name>
    141                     <value>org.apache.hadoop.io.Text</value>
    142                 </property>
    143                 <property>
    144                     <name>mapred.output.key.class</name>
    145                     <value>org.apache.hadoop.io.Text</value>
    146                 </property>
    147                 <property>
    148                     <name>mapred.output.value.class</name>
    149                     <value>org.apache.hadoop.io.LongWritable</value>
    150                 </property>
    151                 <property>
    152                     <name>mapreduce.job.reduces</name>
    153                     <value>1</value>
    154                 </property>
    155 
    156 
    157             </configuration>
    158 
    159         </map-reduce>
    160         <ok to="end"/>
    161         <error to="fail"/>
    162     </action>
    163 
    164 
    165 
    166     <kill name="fail">
    167         <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    168     </kill>
    169     <end name="end"/>
    170 </workflow-app>
    View Code 

    注意:

    1.在mapreduce中所有的参数都要一一对应地写到配置文件中 ,oozie并不会走main程序。

    运行:

    1.workflow-hotlink-mr,workflow.xml,coordinator.xml  放到集群中(我的目录是/eefung/yangzf/oozie/jobs/hotlink),job.properties放到本地(目录是/eefung/yangzf/oozie/jobs/hotlink)(虽然路径名相同但一个在hdfs上,一个在代理服务器上)。

    2.输入命令: oozie job -oozie http://hdt02:11000/oozie -config  /home/oozie/eefung/yangzf/oozie/jobs/hotlink/job.properties -run 。

      若成功会返回job id,可以在 http://hdt02:11000/oozie/ 页面跟踪它的运行 。

  • 相关阅读:
    JavaScript
    94.Binary Tree Inorder Traversal
    144.Binary Tree Preorder Traversal
    106.Construct Binary Tree from Inorder and Postorder Traversal
    105.Construct Binary Tree from Preorder and Inorder Traversal
    90.Subsets II
    78.Subsets
    83.Merge Sorted Array
    80.Remove Duplicates from Sorted Array II
    79.Word Search
  • 原文地址:https://www.cnblogs.com/assult/p/4275201.html
Copyright © 2011-2022 走看看