zoukankan      html  css  js  c++  java
  • ChainMapper和ChainReducer

    The ChainMapper class allows to use multiple Mapper classes within a single Map task. 

    The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

     http://www.oratea.net/?p=371

     通过ChainMapper可以将多个map类合并成一个map任务。

    下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。

    源文件
    100 tom 90
    101 mary 85
    102 kate 60

    map00的结果,过滤掉100的记录
    101 mary 85
    102 kate 60

    map01的结果,过滤掉101的记录
    102 kate 60

    reduce结果
    102 kate 60

     package org.myorg;


    import java.io.IOException;
    import java.util.*;
    import java.lang.String;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;
    import org.apache.hadoop.mapred.lib.*;

    public class WordCount
    {

        
    public static class Map00 extends MapReduceBase implements Mapper
        {

            
    public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
            {

                Text ft 
    = new Text(“100″);

                
    if(!key.equals(ft))
                {
                    output.collect(key, value);
                }
            }
        }

        
    public static class Map01 extends MapReduceBase implements Mapper
        {

            
    public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
            {

                Text ft 
    = new Text(“101″);

                
    if(!key.equals(ft))
                {
                    output.collect(key, value);
                }
            }
        }

        
    public static class Reduce extends MapReduceBase implements Reducer
        {
            
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
            {

                
    while(values.hasNext())
                {
                    output.collect(key, values.next());
                }

            }
        }

        
    public static void main(String[] args) throws Exception
        {

            JobConf conf 
    = new JobConf(WordCount.class);
            conf.setJobName(“wordcount00″);

            conf.setInputFormat(KeyValueTextInputFormat.
    class);
            conf.setOutputFormat(TextOutputFormat.
    class);

            ChainMapper cm 
    = new ChainMapper();

            JobConf mapAConf 
    = new JobConf(false);
            cm.addMapper(conf, Map00.
    class, Text.class, Text.class, Text.class, Text.classtrue, mapAConf);

            JobConf mapBConf 
    = new JobConf(false);
            cm.addMapper(conf, Map01.
    class, Text.class, Text.class, Text.class, Text.classtrue, mapBConf);

            conf.setReducerClass(Reduce.
    class);

            conf00.setOutputKeyClass(Text.
    class);
            conf00.setOutputValueClass(Text.
    class);

            FileInputFormat.setInputPaths(conf, 
    new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, 
    new Path(args[1]));

            JobClient.runJob(conf);

        }
    }
     

    另外一个例子,代码很多,其实很简单,Conn几个类都是相同的

    http://yixiaohuamax.iteye.com/blog/684244 

    package com.oncedq.code;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.text.SimpleDateFormat;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.mapred.jobcontrol.Job;
    import org.apache.hadoop.mapred.jobcontrol.JobControl;
    import org.apache.hadoop.mapred.lib.ChainMapper;

    import com.oncedq.code.util.DateUtil;

    public class ProcessSample {
        
    public static class ExtractMappper extends MapReduceBase implements
                Mapper
    <LongWritable, Text, LongWritable, Conn1> {

            @Override
            
    public void map(LongWritable arg0, Text arg1,
                    OutputCollector
    <LongWritable, Conn1> arg2, Reporter arg3)
                    
    throws IOException {
                String line 
    = arg1.toString();
                String[] strs 
    = line.split(";");
                Conn1 conn1 
    = new Conn1();
                conn1.orderKey 
    = Long.parseLong(strs[0]);
                conn1.customer 
    = Long.parseLong(strs[1]);
                conn1.state 
    = strs[2];
                conn1.price 
    = Double.parseDouble(strs[3]);
                conn1.orderDate 
    = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
                LongWritable lw 
    = new LongWritable(conn1.orderKey);
                arg2.collect(lw, conn1);
            }

        }

        
    private static class Conn1 implements WritableComparable<Conn1> {
            
    public long orderKey;
            
    public long customer;
            
    public String state;
            
    public double price;
            
    public java.util.Date orderDate;

            @Override
            
    public void readFields(DataInput in) throws IOException {
                orderKey 
    = in.readLong();
                customer 
    = in.readLong();
                state 
    = Text.readString(in);
                price 
    = in.readDouble();
                orderDate 
    = DateUtil.getDateFromString(Text.readString(in),
                        
    "yyyy-MM-dd");
            }

            @Override
            
    public void write(DataOutput out) throws IOException {
                out.writeLong(orderKey);
                out.writeLong(customer);
                Text.writeString(out, state);
                out.writeDouble(price);
                Text.writeString(out, DateUtil.getDateStr(orderDate, 
    "yyyy-MM-dd"));
            }

            @Override
            
    public int compareTo(Conn1 arg0) {
                
    // TODO Auto-generated method stub
                return 0;
            }

        }

        
    public static class Filter1Mapper extends MapReduceBase implements
                Mapper
    <LongWritable, Conn1, LongWritable, Conn2> {

            @Override
            
    public void map(LongWritable inKey, Conn1 c2,
                    OutputCollector
    <LongWritable, Conn2> collector, Reporter report)
                    
    throws IOException {
                
    if (c2.state.equals("F")) {
                    Conn2 inValue 
    = new Conn2();
                    inValue.customer 
    = c2.customer;
                    inValue.orderDate 
    = c2.orderDate;
                    inValue.orderKey 
    = c2.orderKey;
                    inValue.price 
    = c2.price;
                    inValue.state 
    = c2.state;
                    collector.collect(inKey, inValue);
                }
            }

        }

        
    private static class Conn2 implements WritableComparable<Conn1> {
            
    public long orderKey;
            
    public long customer;
            
    public String state;
            
    public double price;
            
    public java.util.Date orderDate;

            @Override
            
    public void readFields(DataInput in) throws IOException {
                orderKey 
    = in.readLong();
                customer 
    = in.readLong();
                state 
    = Text.readString(in);
                price 
    = in.readDouble();
                orderDate 
    = DateUtil.getDateFromString(Text.readString(in),
                        
    "yyyy-MM-dd");
            }

            @Override
            
    public void write(DataOutput out) throws IOException {
                out.writeLong(orderKey);
                out.writeLong(customer);
                Text.writeString(out, state);
                out.writeDouble(price);
                Text.writeString(out, DateUtil.getDateStr(orderDate, 
    "yyyy-MM-dd"));
            }

            @Override
            
    public int compareTo(Conn1 arg0) {
                
    // TODO Auto-generated method stub
                return 0;
            }

        }

        
    public static class RegexMapper extends MapReduceBase implements
                Mapper
    <LongWritable, Conn2, LongWritable, Conn3> {

            @Override
            
    public void map(LongWritable inKey, Conn2 c3,
                    OutputCollector
    <LongWritable, Conn3> collector, Reporter report)
                    
    throws IOException {
                c3.state 
    = c3.state.replaceAll("F""Find");
                Conn3 c2 
    = new Conn3();
                c2.customer 
    = c3.customer;
                c2.orderDate 
    = c3.orderDate;
                c2.orderKey 
    = c3.orderKey;
                c2.price 
    = c3.price;
                c2.state 
    = c3.state;
                collector.collect(inKey, c2);
            }
        }

        
    private static class Conn3 implements WritableComparable<Conn1> {
            
    public long orderKey;
            
    public long customer;
            
    public String state;
            
    public double price;
            
    public java.util.Date orderDate;

            @Override
            
    public void readFields(DataInput in) throws IOException {
                orderKey 
    = in.readLong();
                customer 
    = in.readLong();
                state 
    = Text.readString(in);
                price 
    = in.readDouble();
                orderDate 
    = DateUtil.getDateFromString(Text.readString(in),
                        
    "yyyy-MM-dd");
            }

            @Override
            
    public void write(DataOutput out) throws IOException {
                out.writeLong(orderKey);
                out.writeLong(customer);
                Text.writeString(out, state);
                out.writeDouble(price);
                Text.writeString(out, DateUtil.getDateStr(orderDate, 
    "yyyy-MM-dd"));
            }

            @Override
            
    public int compareTo(Conn1 arg0) {
                
    // TODO Auto-generated method stub
                return 0;
            }

        }

        
    public static class LoadMapper extends MapReduceBase implements
                Mapper
    <LongWritable, Conn3, LongWritable, Conn3> {

            @Override
            
    public void map(LongWritable arg0, Conn3 arg1,
                    OutputCollector
    <LongWritable, Conn3> arg2, Reporter arg3)
                    
    throws IOException {
                arg2.collect(arg0, arg1);
            }

        }

        
    public static void main(String[] args) {
            JobConf job 
    = new JobConf(ProcessSample.class);
            job.setJobName(
    "ProcessSample");
            job.setNumReduceTasks(
    0);
            job.setInputFormat(TextInputFormat.
    class);
            job.setOutputFormat(TextOutputFormat.
    class);
            JobConf mapper1 
    = new JobConf();
            JobConf mapper2 
    = new JobConf();
            JobConf mapper3 
    = new JobConf();
            JobConf mapper4 
    = new JobConf();
            ChainMapper cm 
    = new ChainMapper();
            cm.addMapper(job, ExtractMappper.
    class, LongWritable.class, Text.class,
                    LongWritable.
    class, Conn1.classtrue, mapper1);
            cm.addMapper(job, Filter1Mapper.
    class, LongWritable.class, Conn1.class,
                    LongWritable.
    class, Conn2.classtrue, mapper2);
            cm.addMapper(job, RegexMapper.
    class, LongWritable.class, Conn2.class,
                    LongWritable.
    class, Conn3.classtrue, mapper3);
            cm.addMapper(job, LoadMapper.
    class, LongWritable.class, Conn3.class,
                    LongWritable.
    class, Conn3.classtrue, mapper4);
            FileInputFormat.setInputPaths(job, 
    new Path("orderData"));
            FileOutputFormat.setOutputPath(job, 
    new Path("orderDataOutput"));
            Job job1;
            
    try {
                job1 
    = new Job(job);
                JobControl jc 
    = new JobControl("test");
                jc.addJob(job1);
                jc.run();
            } 
    catch (IOException e) {
                
    // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    } 

  • 相关阅读:
    BZOJ-4008: [HNOI2015]亚瑟王 (概率期望DP)
    BZOJ-4832: [Lydsy2017年4月月赛]抵制克苏恩 (概率期望DP)
    BZOJ-1415: [Noi2005]聪聪和可可 (期望DP)
    BZOJ2425 [HAOI2010]计数
    BZOJ2424 [HAOI2010]订货
    BZOJ2423 [HAOI2010]最长公共子序列
    BZOJ2299 [HAOI2011]向量
    BZOJ2298 [HAOI2011]problem a
    BZOJ1040 [ZJOI2008]骑士
    BZOJ一天提交(AC) 51纪念
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2165774.html
Copyright © 2011-2022 走看看