zoukankan      html  css  js  c++  java
  • MapReduce实战(三)分区的实现

    需求:

    在实战(一)的基础 上,实现自定义分组机制。例如根据手机号的不同,分成不同的省份,然后在不同的reduce上面跑,最后生成的结果分别存在不同的文件中。

    对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。

    思考:

    需要自定义改造两个机制:
    1、改造分区的逻辑,自定义一个partitioner,主要是实现如何进行分组。

    Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一个分区的数据交给同一个Reducer处理,它直接影响Reducer阶段的负载均衡。
        Partitioner只提供了一个方法:
        getPartition(Text key,Text value,int numPartitions)
        前两个参数是Map的Key和Value,numPartitions为Reduce的个数。


    2、自定义reducer task的并发任务数,使得多个reduce同时工作。

    项目目录如下:

    AreaPartition.java:

    package cn.darrenchan.hadoop.mr.areapartition;
    
    import java.util.HashMap;
    
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
    
        private static HashMap<String,Integer> areaMap = new HashMap<>();
        
        /**
         * 这里只是提前设定了一下,其实这里可以写查询数据库,返回号码所在省份的编号
         */
        static{
            areaMap.put("135", 0);
            areaMap.put("136", 1);
            areaMap.put("137", 2);
            areaMap.put("138", 3);
            areaMap.put("139", 4);
        }
        
        @Override
        public int getPartition(KEY key, VALUE value, int numPartitions) {
            //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号
            int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));
            return areaCoder;
        }
    
    }

    FlowSumArea.java:

    package cn.darrenchan.hadoop.mr.areapartition;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import cn.darrenchan.hadoop.mr.flow.FlowBean;
    
    /**
     * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 
     * 需要自定义改造两个机制:
     * 1、改造分区的逻辑,自定义一个partitioner
     * 2、自定义reduer task的并发任务数
     * 
     */
    public class FlowSumArea {
    
        public static class FlowSumAreaMapper extends
                Mapper<LongWritable, Text, Text, FlowBean> {
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
    
                // 拿一行数据
                String line = value.toString();
                // 切分成各个字段
                String[] fields = StringUtils.split(line, "	");
    
                // 拿到我们需要的字段
                String phoneNum = fields[1];
                long upFlow = Long.parseLong(fields[7]);
                long downFlow = Long.parseLong(fields[8]);
    
                // 封装数据为kv并输出
                context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow,
                        downFlow));
    
            }
    
        }
    
        public static class FlowSumAreaReducer extends
                Reducer<Text, FlowBean, Text, FlowBean> {
    
            @Override
            protected void reduce(Text key, Iterable<FlowBean> values,
                    Context context) throws IOException, InterruptedException {
    
                long up_flow_counter = 0;
                long d_flow_counter = 0;
    
                for (FlowBean bean : values) {
    
                    up_flow_counter += bean.getUpFlow();
                    d_flow_counter += bean.getDownFlow();
    
                }
    
                context.write(key, new FlowBean(key.toString(), up_flow_counter,
                        d_flow_counter));
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(FlowSumArea.class);
    
            job.setMapperClass(FlowSumAreaMapper.class);
            job.setReducerClass(FlowSumAreaReducer.class);
    
            // 设置我们自定义的分组逻辑定义
            job.setPartitionerClass(AreaPartitioner.class);
          
    job.setOutputKeyClass(Text.
    class); job.setOutputValueClass(FlowBean.class); // 设置reduce的任务并发数,应该跟分组的数量保持一致,写1不会报错,2,3,4,5均会报错,7,8,9...反而不会报错,因为后面的直接数据为0了 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

    FlowBeanArea.java:

    package cn.darrenchan.hadoop.mr.flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    public class FlowBean implements WritableComparable<FlowBean> {
        private String phoneNum;// 手机号
        private long upFlow;// 上行流量
        private long downFlow;// 下行流量
        private long sumFlow;// 总流量
    
        public FlowBean() {
            super();
        }
    
        public FlowBean(String phoneNum, long upFlow, long downFlow) {
            super();
            this.phoneNum = phoneNum;
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
        public String getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(String phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public String toString() {
            return upFlow + "	" + downFlow + "	" + sumFlow;
        }
    
        // 从数据流中反序列出对象的数据
        // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
        @Override
        public void readFields(DataInput in) throws IOException {
            phoneNum = in.readUTF();
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        // 将对象数据序列化到流中
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(phoneNum);
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public int compareTo(FlowBean flowBean) {
            return sumFlow > flowBean.getSumFlow() ? -1 : 1;
        }
    
    }

    将项目打包成area.jar,并执行命令:

    hadoop jar area.jar cn.darrenchan.hadoop.mr.areapartition.FlowSumArea /flow/srcdata /flow/outputarea

    我们可以看到如下运行信息: 

    17/02/26 09:10:54 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
    17/02/26 09:10:54 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/02/26 09:10:55 INFO input.FileInputFormat: Total input paths to process : 1
    17/02/26 09:10:55 INFO mapreduce.JobSubmitter: number of splits:1
    17/02/26 09:10:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0005
    17/02/26 09:10:55 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0005
    17/02/26 09:10:55 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0005/
    17/02/26 09:10:55 INFO mapreduce.Job: Running job: job_1488112052214_0005
    17/02/26 09:11:01 INFO mapreduce.Job: Job job_1488112052214_0005 running in uber mode : false
    17/02/26 09:11:01 INFO mapreduce.Job: map 0% reduce 0%
    17/02/26 09:11:07 INFO mapreduce.Job: map 100% reduce 0%
    17/02/26 09:11:19 INFO mapreduce.Job: map 100% reduce 17%
    17/02/26 09:11:23 INFO mapreduce.Job: map 100% reduce 33%
    17/02/26 09:11:26 INFO mapreduce.Job: map 100% reduce 50%
    17/02/26 09:11:27 INFO mapreduce.Job: map 100% reduce 83%
    17/02/26 09:11:28 INFO mapreduce.Job: map 100% reduce 100%
    17/02/26 09:11:28 INFO mapreduce.Job: Job job_1488112052214_0005 completed successfully
    17/02/26 09:11:28 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=1152
    FILE: Number of bytes written=652142
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=2338
    HDFS: Number of bytes written=526
    HDFS: Number of read operations=21
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=12
    Job Counters
    Launched map tasks=1
    Launched reduce tasks=6
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=2663
    Total time spent by all reduces in occupied slots (ms)=83315
    Total time spent by all map tasks (ms)=2663
    Total time spent by all reduce tasks (ms)=83315
    Total vcore-seconds taken by all map tasks=2663
    Total vcore-seconds taken by all reduce tasks=83315
    Total megabyte-seconds taken by all map tasks=2726912
    Total megabyte-seconds taken by all reduce tasks=85314560
    Map-Reduce Framework
    Map input records=22
    Map output records=22
    Map output bytes=1072
    Map output materialized bytes=1152
    Input split bytes=124
    Combine input records=0
    Combine output records=0
    Reduce input groups=21
    Reduce shuffle bytes=1152
    Reduce input records=22
    Reduce output records=21
    Spilled Records=44
    Shuffled Maps =6
    Failed Shuffles=0
    Merged Map outputs=6
    GC time elapsed (ms)=524
    CPU time spent (ms)=3210
    Physical memory (bytes) snapshot=509775872
    Virtual memory (bytes) snapshot=2547916800
    Total committed heap usage (bytes)=218697728
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=2214
    File Output Format Counters
    Bytes Written=526

     运行结果完成之后,我们发现这次生成了6个文件,显示如下:

    最终显示结果如下所示,我们看到的确是按照我们预期的进行了相应的分组:

    在运行过程中,我们不断监控该过程,看看是不是一共6个reduce同时工作,发现最多的地方确实是6个YarnChild,说明我们的程序正确。

    Last login: Sun Feb 26 04:26:01 2017 from 192.168.230.1
    [hadoop@weekend110 ~] jps
    2473 NameNode
    8703 RunJar
    9214 Jps
    9029 YarnChild
    8995 YarnChild
    2747 SecondaryNameNode
    8978 -- process information unavailable
    2891 ResourceManager
    2992 NodeManager
    8799 MRAppMaster
    9053 YarnChild
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    2747 SecondaryNameNode
    2891 ResourceManager
    2992 NodeManager
    8799 MRAppMaster
    2569 DataNode
    9330 Jps
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    9495 Jps
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    9558 Jps
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9580 Jps
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9598 YarnChild
    9482 MRAppMaster
    2747 SecondaryNameNode
    9623 Jps
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9650 Jps
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9665 YarnChild
    2747 SecondaryNameNode
    9681 YarnChild
    9696 Jps
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9772 Jps
    9482 MRAppMaster
    9665 YarnChild
    2747 SecondaryNameNode
    9681 YarnChild
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9817 Jps
    9665 -- process information unavailable
    2747 SecondaryNameNode
    9681 YarnChild
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    9681 YarnChild
    9872 Jps
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    9921 Jps
    2747 SecondaryNameNode
    9770 YarnChild
    9751 YarnChild
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    9730 YarnChild
    2569 DataNode
    9704 YarnChild
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    9770 YarnChild
    9751 -- process information unavailable
    2891 ResourceManager
    9386 RunJar
    10021 Jps
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    10079 Jps
    2891 ResourceManager
    9386 RunJar
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    10090 Jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    2891 ResourceManager
    2992 NodeManager
    2569 DataNode
    [hadoop@weekend110 ~] jps
    2473 NameNode
    9482 MRAppMaster
    2747 SecondaryNameNode
    10099 Jps
    2891 ResourceManager
    2992 NodeManager
    2569 DataNode

  • 相关阅读:
    关于requests.exceptions.SSLError: HTTPSConnectionPool(host='XXX', port=443)问题
    python Requests库总结
    fiddler实现手机抓包及手机安装证书报错“无法安装该证书 因为无法读取该证书文件”解决方法
    django接口的工作原理
    postman+newman+jenkins 持续集成搭建及使用,实现接口自动化
    Jmeter之JDBC Request及参数化
    selenium+Python中的面试总结
    UI自动化测试:页面截图的3种方法
    selenium中通过location和size定位元素坐标
    Allure+pytest生成测试报告
  • 原文地址:https://www.cnblogs.com/DarrenChan/p/6464259.html
Copyright © 2011-2022 走看看