zoukankan      html  css  js  c++  java
  • MR框架-->Word4

    • 用户流量排序

    实现编码:

    FlowBean类:把上行流量和下行流量以及总流量封装到一个bean中进行描述,注意要实现hadoop的序列化接口Writable

    package com.hp.mr;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class FlowBean implements Writable {
        //私有属性
        private int upFlow;
        private int downFlow;
        private int sumFlow;
        //无参构造方法
        public FlowBean() {
            
        }
        //有参构造
        public FlowBean(int upFlow,int downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
        //get set方法
        public int getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
    
        public int getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
    
        public int getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(int sumFlow) {
            this.sumFlow = sumFlow;
        }
        //序列化
        @Override
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readInt();
            downFlow = in.readInt();
        }
        //反序列化
        @Override
        public void write(DataOutput out) throws IOException {
            out.write(upFlow);
            out.write(downFlow);
            
        }
        @Override
        public String toString() {
            return upFlow + "	" +  downFlow + "	" + sumFlow ;
        }
    
    }

    需要注意的是:用户流量的上行和下行流量总和的思路跟之前的都一样,而FlowBean的使用方法,把多个需要统计的信息用Bean对象来封装,注意Mapper和Reducer的KEY和VALUE的数据类型

    Mapper类:

    package com.hp.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            //获取一行数据
            String line = value.toString();
            //指定规则截取
            String[] words = line.split("	");
            //获取有效数据
            String phone = words[1];
            String upFlow = words[2];
            String downFlow = words[3];
            int up = Integer.parseInt(upFlow);
            int down =  Integer.parseInt(downFlow);
            FlowBean fs = new FlowBean(up, down);
            //写入上下文
            context.write(new Text(phone),fs);
        }
    }

    Reducer类:

    package com.hp.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            //定义空变量
            int up = 0;
            int down = 0;
            //获取上行下行流量
            FlowBean fs = new FlowBean();
            //上下流量
            /*int upFlow = fs.getUpFlow();
            int downFlow = fs.getDownFlow();*/
            for (FlowBean flowBean : values) {
                down += flowBean.getUpFlow();
                up += flowBean.getDownFlow();
            }
            FlowBean fl = new FlowBean(up, down);
            //写入上下文
            context.write(key, fl);
        }
    }

    Submitter类:

    package com.hp.mr;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class Submitter {
        public static void main(String[] args) throws Exception {
            //添加配置文件
            Configuration conf = new Configuration();
            //创建FileSystem对象
            FileSystem fs = FileSystem.get(conf);
            //判断输出路径是否存在
            if(fs.exists(new Path(args[1]))) {
                fs.delete(new Path(args[1]),true);
            }
            //创建Job对象
            Job job = Job.getInstance(conf);
            //设置提交主类
            job.setJarByClass(Submitter.class);
            //设置Mapper相关的参数
            job.setMapperClass(FlowMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            //设置Reducer类相关的参数
            job.setReducerClass(FlowReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            //设置输入路劲
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //设置输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //提交任务
            job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    Powershell数据处理
    Powershell About Active Directory Group Membership of a domain user
    Powershell About Active Directory Server
    Oracle Schema Objects——Tables——TableStorage
    Oracle Schema Objects——Tables——TableType
    English Grammar
    Oracle Database Documentation
    Oracle Schema Objects——Tables——Oracle Data Types
    Oracle Schema Objects——Tables——Overview of Tables
    What is Grammar?
  • 原文地址:https://www.cnblogs.com/wyk1/p/13964811.html
Copyright © 2011-2022 走看看