  • MapReduce实战(三)分区的实现


    在实战(一)的基础 上,实现自定义分组机制。例如根据手机号的不同,分成不同的省份,然后在不同的reduce上面跑,最后生成的结果分别存在不同的文件中。




        getPartition(Text key,Text value,int numPartitions)

    2、自定义reducer task的并发任务数,使得多个reduce同时工作。



    package cn.darrenchan.hadoop.mr.areapartition;
    import java.util.HashMap;
    import org.apache.hadoop.mapreduce.Partitioner;
    public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
        private static HashMap<String,Integer> areaMap = new HashMap<>();
         * 这里只是提前设定了一下,其实这里可以写查询数据库,返回号码所在省份的编号
            areaMap.put("135", 0);
            areaMap.put("136", 1);
            areaMap.put("137", 2);
            areaMap.put("138", 3);
            areaMap.put("139", 4);
        public int getPartition(KEY key, VALUE value, int numPartitions) {
            int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));
            return areaCoder;


    package cn.darrenchan.hadoop.mr.areapartition;
    import java.io.IOException;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import cn.darrenchan.hadoop.mr.flow.FlowBean;
     * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 
     * 需要自定义改造两个机制:
     * 1、改造分区的逻辑,自定义一个partitioner
     * 2、自定义reduer task的并发任务数
    public class FlowSumArea {
        public static class FlowSumAreaMapper extends
                Mapper<LongWritable, Text, Text, FlowBean> {
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                // 拿一行数据
                String line = value.toString();
                // 切分成各个字段
                String[] fields = StringUtils.split(line, "	");
                // 拿到我们需要的字段
                String phoneNum = fields[1];
                long upFlow = Long.parseLong(fields[7]);
                long downFlow = Long.parseLong(fields[8]);
                // 封装数据为kv并输出
                context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow,
        public static class FlowSumAreaReducer extends
                Reducer<Text, FlowBean, Text, FlowBean> {
            protected void reduce(Text key, Iterable<FlowBean> values,
                    Context context) throws IOException, InterruptedException {
                long up_flow_counter = 0;
                long d_flow_counter = 0;
                for (FlowBean bean : values) {
                    up_flow_counter += bean.getUpFlow();
                    d_flow_counter += bean.getDownFlow();
                context.write(key, new FlowBean(key.toString(), up_flow_counter,
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置我们自定义的分组逻辑定义
    job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 设置reduce的任务并发数,应该跟分组的数量保持一致,写1不会报错,2,3,4,5均会报错,7,8,9...反而不会报错,因为后面的直接数据为0了 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }


    package cn.darrenchan.hadoop.mr.flow;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    public class FlowBean implements WritableComparable<FlowBean> {
        private String phoneNum;// 手机号
        private long upFlow;// 上行流量
        private long downFlow;// 下行流量
        private long sumFlow;// 总流量
        public FlowBean() {
        public FlowBean(String phoneNum, long upFlow, long downFlow) {
            this.phoneNum = phoneNum;
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        public String getPhoneNum() {
            return phoneNum;
        public void setPhoneNum(String phoneNum) {
            this.phoneNum = phoneNum;
        public long getUpFlow() {
            return upFlow;
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        public long getDownFlow() {
            return downFlow;
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        public long getSumFlow() {
            return sumFlow;
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        public String toString() {
            return upFlow + "	" + downFlow + "	" + sumFlow;
        // 从数据流中反序列出对象的数据
        // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
        public void readFields(DataInput in) throws IOException {
            phoneNum = in.readUTF();
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        // 将对象数据序列化到流中
        public void write(DataOutput out) throws IOException {
        public int compareTo(FlowBean flowBean) {
            return sumFlow > flowBean.getSumFlow() ? -1 : 1;


    hadoop jar area.jar cn.darrenchan.hadoop.mr.areapartition.FlowSumArea /flow/srcdata /flow/outputarea


    17/02/26 09:10:54 INFO client.RMProxy: Connecting to ResourceManager at weekend110/
    17/02/26 09:10:54 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/02/26 09:10:55 INFO input.FileInputFormat: Total input paths to process : 1
    17/02/26 09:10:55 INFO mapreduce.JobSubmitter: number of splits:1
    17/02/26 09:10:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0005
    17/02/26 09:10:55 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0005
    17/02/26 09:10:55 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0005/
    17/02/26 09:10:55 INFO mapreduce.Job: Running job: job_1488112052214_0005
    17/02/26 09:11:01 INFO mapreduce.Job: Job job_1488112052214_0005 running in uber mode : false
    17/02/26 09:11:01 INFO mapreduce.Job: map 0% reduce 0%
    17/02/26 09:11:07 INFO mapreduce.Job: map 100% reduce 0%
    17/02/26 09:11:19 INFO mapreduce.Job: map 100% reduce 17%
    17/02/26 09:11:23 INFO mapreduce.Job: map 100% reduce 33%
    17/02/26 09:11:26 INFO mapreduce.Job: map 100% reduce 50%
    17/02/26 09:11:27 INFO mapreduce.Job: map 100% reduce 83%
    17/02/26 09:11:28 INFO mapreduce.Job: map 100% reduce 100%
    17/02/26 09:11:28 INFO mapreduce.Job: Job job_1488112052214_0005 completed successfully
    17/02/26 09:11:28 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=1152
    FILE: Number of bytes written=652142
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=2338
    HDFS: Number of bytes written=526
    HDFS: Number of read operations=21
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=12
    Job Counters
    Launched map tasks=1
    Launched reduce tasks=6
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=2663
    Total time spent by all reduces in occupied slots (ms)=83315
    Total time spent by all map tasks (ms)=2663
    Total time spent by all reduce tasks (ms)=83315
    Total vcore-seconds taken by all map tasks=2663
    Total vcore-seconds taken by all reduce tasks=83315
    Total megabyte-seconds taken by all map tasks=2726912
    Total megabyte-seconds taken by all reduce tasks=85314560
    Map-Reduce Framework
    Map input records=22
    Map output records=22
    Map output bytes=1072
    Map output materialized bytes=1152
    Input split bytes=124
    Combine input records=0
    Combine output records=0
    Reduce input groups=21
    Reduce shuffle bytes=1152
    Reduce input records=22
    Reduce output records=21
    Spilled Records=44
    Shuffled Maps =6
    Failed Shuffles=0
    Merged Map outputs=6
    GC time elapsed (ms)=524
    CPU time spent (ms)=3210
    Physical memory (bytes) snapshot=509775872
    Virtual memory (bytes) snapshot=2547916800
    Total committed heap usage (bytes)=218697728
    Shuffle Errors
    File Input Format Counters
    Bytes Read=2214
    File Output Format Counters
    Bytes Written=526




    Last login: Sun Feb 26 04:26:01 2017 from
    [hadoop@weekend110 ~] jps
    2473 NameNode
    8703 RunJar
    9214 Jps
    9029 YarnChild
    8995 YarnChild
    2747 SecondaryNameNode
    8978 -- process information unavailable
    2891 ResourceManager
    2992 NodeManager
    8799 MRAppMaster
    9053 YarnChild
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    2747 SecondaryNameNode
    2891 ResourceManager
    2992 NodeManager
    8799 MRAppMaster
    2569 DataNode
    9330 Jps
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    9495 Jps
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    9558 Jps
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9580 Jps
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9598 YarnChild
    9482 MRAppMaster
    2747 SecondaryNameNode
    9623 Jps
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9650 Jps
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9665 YarnChild
    2747 SecondaryNameNode
    9681 YarnChild
    9696 Jps
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9772 Jps
    9482 MRAppMaster
    9665 YarnChild
    2747 SecondaryNameNode
    9681 YarnChild
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9817 Jps
    9665 -- process information unavailable
    2747 SecondaryNameNode
    9681 YarnChild
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    9681 YarnChild
    9872 Jps
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9921 Jps
    2747 SecondaryNameNode
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    9770 YarnChild
    9751 -- process information unavailable
    2891 ResourceManager
    9386 RunJar
    10021 Jps
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    10079 Jps
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    10090 Jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    10099 Jps
    2891 ResourceManager
    2992 NodeManager
    2569 DataNode

