zoukankan      html  css  js  c++  java
  • MapReduce全流程_分区_排序

    1、MapReduce完成的工作流程:

     

     

     

     2、分区操作(Partition分区

     

     

    Partition分区案例实操

    1.需求

    将统计结果按照手机归属地不同省份输出到不同文件中(分区)期望输出数据手机136137138139开头都分别放到一个独立4文件中,其他开头的放到一个文件中。

     代码如下:

    package partiton;
    import flow.FlowBean;
    import flow.FlowMapper;
    import flow.FlowReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    
    import java.io.IOException;
    public class partitonDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //1、获取job实例
            Job job=Job.getInstance(new Configuration());
            //2、设置类路径
            job.setJarByClass(partitonDriver.class);
            //3、设置Mapper和Reducer
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
            job.setNumReduceTasks(5);
            job.setPartitionerClass(MyPartitioner.class);
            //4、设置输入输入输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            //5、设置输入输出路径
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            //6、进行提交
            boolean b=job.waitForCompletion(true);
            System.exit(b ? 0:1);
        }
    }
    

      

    package partiton;
    
    import flow.FlowBean;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class MyPartitioner extends Partitioner<Text, FlowBean> {
        //返回分区号
        public int getPartition(Text text, FlowBean flowBean, int i) {
            String phone=text.toString();
            switch (phone.substring(0,3)){
                case "136":
                    return 0;
                case "137":
                    return 1;
                case "138":
                    return 2;
                case "139":
                    return 3;
                default:
                    return 4;
    
            }
        }
    }
    

      成功运行之后

    并存储为了文件。显然已经了分区操作

    3、排序操作:

    排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

    (1)部分排序
    MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
    (2)全排序
    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个Reduce Task。但该方法在
    处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
    (3)辅助排序: (GroupingCompan tor分组)
    在Redre端对key进行分组。应用于:在接收的ke y为bean对象时,想让-个或几个字段相同(全部
    字段比较不相同)的hkey进入 到同-个reduce方法时,可以采用分组排序。
    (4)二次排序.
    在自定义排序过程中,如果compare To中的判断条件为两个即为二次排序。

    代码如下:

    package writablecomparable;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class FlowBean implements Writable ,Comparable<FlowBean>{
        private long upFlow;
        private long downFlow;
        private long sumFlow;
        //准备一个空参构造器
        public FlowBean()
        {}
        public void set(long upFlow,long downFlow)
        {
            this.downFlow=downFlow;
            this.upFlow=upFlow;
            this.sumFlow=upFlow+downFlow;
        }
        @Override
        public String toString()
        {
            return upFlow+"	"+downFlow+"	"+sumFlow;
        }
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
        //序列化方法
        //提供数据的出口
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(downFlow);
            dataOutput.writeLong(sumFlow);
    
        }
       //反序列化方法
        //框架提供的数据来源
        public void readFields(DataInput dataInput) throws IOException {
            upFlow=dataInput.readLong();
            downFlow=dataInput.readLong();
            sumFlow=dataInput.readLong();
        }
    
        @Override
        public int compareTo(FlowBean o) {
            return Long.compare(o.sumFlow,this.sumFlow);
        }
        //这两个方法里面的内容顺序要一样uds,
    }
    

      

    package writablecomparable;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class SortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
    
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for(Text value:values)
            {
                context.write(value,key);
            }
        }
    }
    

      

    package writablecomparable;
    
    import com.sun.tools.javac.comp.Flow;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class SortDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job =Job.getInstance(new Configuration());
            job.setJarByClass(SortDriver.class);
            job.setMapperClass(SortMapper.class);
            job.setReducerClass(SortReducer.class);
    
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            FileInputFormat.setInputPaths(job,new Path("D:\wev"));
            FileOutputFormat.setOutputPath(job,new Path("D:\wev"));
    
            boolean b=job.waitForCompletion(true);
            System.exit(b?0:1);
        }
    }
    

      

    package writablecomparable;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SortMapper extends Mapper<LongWritable,Text,FlowBean, Text> {
        private FlowBean flow=new FlowBean();
        private Text phone =new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fieds=value.toString().split("	");
            phone.set(fieds[0]);
            flow.setUpFlow(Long.parseLong(fieds[1]));
            flow.setDownFlow(Long.parseLong(fieds[2]));
            flow.setSumFlow(Long.parseLong(fieds[3]));
            context.write(flow,phone);
        }
    }
    

      

    运行结果显示已经按照流量排序而完成:

     4、排序加分区

    driver

    package writablecomprable;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import writablecomparable.FlowBean;
    import writablecomparable.SortMapper;
    import writablecomparable.SortReducer;
    
    import java.io.IOException;
    
    public class SortDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job =Job.getInstance(new Configuration());
            job.setJarByClass(SortDriver.class);
            job.setMapperClass(SortMapper.class);
            job.setReducerClass(SortReducer.class);
    
            job.setPartitionerClass(MyPartitioner.class);
             job.setNumReduceTasks(5);
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            FileInputFormat.setInputPaths(job,new Path("D:\wev"));
            FileOutputFormat.setOutputPath(job,new Path("D:\wev"));
    
            boolean b=job.waitForCompletion(true);
            System.exit(b?0:1);
        }
    }
    

      partitioner

    package writablecomprable;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    import writablecomparable.FlowBean;
    
    public class MyPartitioner extends Partitioner<FlowBean, Text> {
        @Override
        public int getPartition(FlowBean flowBean, Text text, int i) {
            switch (text.toString().substring(0,3))
            {
                case "136":
                    return 0;
                case "137":
                    return 1;
                case "138":
                    return 2;
                case "139":
                    return 3;
                default:
                    return 4;
            }
        }
    }
    

     运行结果:

     等等

  • 相关阅读:
    JS产生随机数的几个用法!
    title与alt的区别
    jquery select取值,赋值操作
    DIV+CSS中标签dl dt dd常用的用法
    vi/vim键盘图
    win7系统注册表的权限修改
    win7 链接打印机时提示未知的用户名或错误密码
    关于无法把程序(Adobe Fireworks CS5)添加到打开方式的解决办法
    把网页发送到桌面代码
    iframe多层嵌套时获取元素总结
  • 原文地址:https://www.cnblogs.com/dazhi151/p/13526120.html
Copyright © 2011-2022 走看看