zoukankan      html  css  js  c++  java
  • MapReduce实战(二)自定义类型排序

    需求:

    基于上一道题,我想将结果按照总流量的大小由大到小输出。

    思考:

    默认mapreduce是对key字符串按照字母进行排序的,而我们想任意排序,只需要把key设成一个类,再对该类写一个compareTo(大于要比较对象返回1,等于返回0,小于返回-1)方法就可以了。

    注:这里如果是实现java.lang.Comparable接口,最终报错,还是直接实现WritableComparable吧。

    FlowBean.java更改如下:

    package cn.darrenchan.hadoop.mr.flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    public class FlowBean implements WritableComparable<FlowBean> {
        private String phoneNum;// 手机号
        private long upFlow;// 上行流量
        private long downFlow;// 下行流量
        private long sumFlow;// 总流量
    
        public FlowBean() {
            super();
        }
    
        public FlowBean(String phoneNum, long upFlow, long downFlow) {
            super();
            this.phoneNum = phoneNum;
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
        public String getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(String phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public String toString() {
            return upFlow + "	" + downFlow + "	" + sumFlow;
        }
    
        // 从数据流中反序列出对象的数据
        // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
        @Override
        public void readFields(DataInput in) throws IOException {
            phoneNum = in.readUTF();
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        // 将对象数据序列化到流中
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(phoneNum);
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public int compareTo(FlowBean flowBean) {
            return sumFlow > flowBean.getSumFlow() ? -1 : 1;
        }
    
    }

    建立文件SortMR.java:

    package cn.darrenchan.hadoop.mr.flowsort;
    
    import java.io.IOException;
    
    import org.apache.commons.io.output.NullWriter;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import cn.darrenchan.hadoop.mr.flow.FlowBean;
    
    //执行命令:hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort
    public class SortMR {
        public static class SortMapper extends
                Mapper<LongWritable, Text, FlowBean, NullWritable> {
            // 拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] words = StringUtils.split(line, "	");
    
                String phoneNum = words[0];
                long upFlow = Long.parseLong(words[1]);
                long downFlow = Long.parseLong(words[2]);
    
                context.write(new FlowBean(phoneNum, upFlow, downFlow),
                        NullWritable.get());
            }
        }
    
        public static class SortReducer extends
                Reducer<FlowBean, NullWritable, Text, FlowBean> {
            @Override
            protected void reduce(FlowBean key, Iterable<NullWritable> values,
                    Context context) throws IOException, InterruptedException {
                String phoneNum = key.getPhoneNum();
                context.write(new Text(phoneNum), key);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(SortMR.class);
    
            job.setMapperClass(SortMapper.class);
            job.setReducerClass(SortReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    我们现在处理的结果是上一次实验的输出结果,打成jar包flowsort.jar,执行命令:

    hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort

    得到的处理信息如下:

    17/02/26 05:22:36 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
    17/02/26 05:22:36 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/02/26 05:22:36 INFO input.FileInputFormat: Total input paths to process : 1
    17/02/26 05:22:36 INFO mapreduce.JobSubmitter: number of splits:1
    17/02/26 05:22:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0003
    17/02/26 05:22:37 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0003
    17/02/26 05:22:37 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0003/
    17/02/26 05:22:37 INFO mapreduce.Job: Running job: job_1488112052214_0003
    17/02/26 05:24:16 INFO mapreduce.Job: Job job_1488112052214_0003 running in uber mode : false
    17/02/26 05:24:16 INFO mapreduce.Job: map 0% reduce 0%
    17/02/26 05:24:22 INFO mapreduce.Job: map 100% reduce 0%
    17/02/26 05:24:28 INFO mapreduce.Job: map 100% reduce 100%
    17/02/26 05:24:28 INFO mapreduce.Job: Job job_1488112052214_0003 completed successfully
    17/02/26 05:24:28 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=933
    FILE: Number of bytes written=187799
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=735
    HDFS: Number of bytes written=623
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters 
    Launched map tasks=1
    Launched reduce tasks=1
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=3077
    Total time spent by all reduces in occupied slots (ms)=2350
    Total time spent by all map tasks (ms)=3077
    Total time spent by all reduce tasks (ms)=2350
    Total vcore-seconds taken by all map tasks=3077
    Total vcore-seconds taken by all reduce tasks=2350
    Total megabyte-seconds taken by all map tasks=3150848
    Total megabyte-seconds taken by all reduce tasks=2406400
    Map-Reduce Framework
    Map input records=22
    Map output records=22
    Map output bytes=883
    Map output materialized bytes=933
    Input split bytes=112
    Combine input records=0
    Combine output records=0
    Reduce input groups=22
    Reduce shuffle bytes=933
    Reduce input records=22
    Reduce output records=22
    Spilled Records=44
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=142
    CPU time spent (ms)=1280
    Physical memory (bytes) snapshot=218406912
    Virtual memory (bytes) snapshot=726446080
    Total committed heap usage (bytes)=137433088
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters 
    Bytes Read=623
    File Output Format Counters 
    Bytes Written=623

    最终结果如下,可以看到是排序好的。

    1363157985069 186852 200 187052
    1363157985066 2481 24681 27162
    1363157990043 63 11058 11121
    1363157986072 18 9531 9549
    1363157982040 102 7335 7437
    1363157984041 9 6960 6969
    1363157995093 3008 3720 6728
    1363157995074 4116 1432 5548
    1363157992093 4938 200 5138
    1363157973098 27 3659 3686
    1363157995033 20 3156 3176
    1363157984040 12 1938 1950
    1363157986029 3 1938 1941
    1363157991076 1512 200 1712
    1363157993044 12 1527 1539
    1363157993055 954 200 1154
    1363157985079 180 200 380
    1363157986041 180 200 380
    1363157988072 120 200 320
    1363154400022 0 200 200
    1363157983019 0 200 200
    1363157995052 0 200 200

  • 相关阅读:
    Asible——inventory与大项目管理
    Asible——template
    Ansible——文件管理
    Ansible——处理任务失败
    Ansible——handlers与notify
    ubuntu 16.04 LTS 开发环境的安装及常用软件
    curl 命令详解
    VMware虚拟机三种网络模式详解
    ubantu 16.04 安装有道词典
    OneNote 使用汇总
  • 原文地址:https://www.cnblogs.com/DarrenChan/p/6446154.html
Copyright © 2011-2022 走看看