zoukankan      html  css  js  c++  java
  • Hadoop 学习之路 MapReduce编程案例

    一,WordCount程序

      案例分析

       需求:统计多个文件的每个单词的出现的次数。

       分析: 

            map阶段: 将每一行文本数据变成<单词,1>这样的kv数据

            reduce阶段:将相同单词的一组kv数据进行聚合:累加所有的v

      代码实现

       编码实现:Mapper类开发

    package cn.edu360.mr.wc;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * KEYIN :是map task读取到的数据的key的类型,是一行的起始偏移量Long
     * VALUEIN:是map task读取到的数据的value的类型,是一行的内容String
     * 
     * KEYOUT:是用户的自定义map方法要返回的结果kv数据的key的类型,在wordcount逻辑中,我们需要返回的是单词String
     * VALUEOUT:是用户的自定义map方法要返回的结果kv数据的value的类型,在wordcount逻辑中,我们需要返回的是整数Integer
     * 但是,在mapreduce中,map产生的数据需要传输给reduce,需要进行序列化和反序列化,而jdk中的原生序列化机制产生的数据量比较冗余,就会导致数据在mapreduce运行过程中传输效率低下
     * 所以,hadoop专门设计了自己的序列化机制,那么,mapreduce中传输的数据类型就必须实现hadoop自己的序列化接口
     * 
     * hadoop为jdk中的常用基本类型Long String Integer Float等数据类型封住了自己的实现了hadoop序列化接口的类型:LongWritable,Text,IntWritable,FloatWritable
     * @author ThinkPad
     *
     */
    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 切单词
            String line = value.toString();
            String[] words = line.split(" ");
            for(String word:words){
                context.write(new Text(word), new IntWritable(1));
                
            }
        }
    }
    Mapper

       编码实现:Reduce类开发

    package cn.edu360.mr.wc;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        
            
            int count = 0;
            
            Iterator<IntWritable> iterator = values.iterator();
            while(iterator.hasNext()){
                
                IntWritable value = iterator.next();
                count += value.get();
            }
            
            
            context.write(key, new IntWritable(count));
            
        }
        
        
    
    }
    Reduce类

              编码实现: 提交任务类开发

    package cn.edu360.mr.wc;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 用于提交mapreduce job的客户端程序
     * 功能:
     *   1、封装本次job运行时所需要的必要参数
     *   2、跟yarn进行交互,将mapreduce程序成功的启动、运行
     * @author ThinkPad
     *
     */
    public class JobSubmitter {
        
        public static void main(String[] args) throws Exception {
            
            // 在代码中设置JVM系统参数,用于给job对象来获取访问HDFS的用户身份
            System.setProperty("HADOOP_USER_NAME", "root");
            
            
            Configuration conf = new Configuration();
            // 1、设置job运行时要访问的默认文件系统
            conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
            // 2、设置job提交到哪去运行
            conf.set("mapreduce.framework.name", "yarn");
            conf.set("yarn.resourcemanager.hostname", "hdp-01");
            // 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
            conf.set("mapreduce.app-submission.cross-platform","true");
            
            Job job = Job.getInstance(conf);
            
            // 1、封装参数:jar包所在的位置
            job.setJar("d:/wc.jar");
            //job.setJarByClass(JobSubmitter.class);
            
            // 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
            job.setMapperClass(WordcountMapper.class);
            job.setReducerClass(WordcountReducer.class);
            
            // 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            
            
            Path output = new Path("/wordcount/output");
            FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
            if(fs.exists(output)){
                fs.delete(output, true);
            }
            
            // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
            FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
            FileOutputFormat.setOutputPath(job, output);  // 注意:输出路径必须不存在
            
            
            // 5、封装参数:想要启动的reduce task的数量
            job.setNumReduceTasks(2);
            
            // 6、提交job给yarn
            boolean res = job.waitForCompletion(true);
            
            System.exit(res?0:-1);
            
        }
        
        
    
    }
    提交任务类

    二,求WordCount的topN程序

      案例分析

       需求:读取附件中的文件request.dat,求访问次数最多的topn个网站(只能有1reduce worker《全局TOPN

       思路: 

         map阶段:解析数据,将域名作为key1作为value

         reduce阶段:

           reduce方法中——对一个域名的一组1累加,然后将 <域名,总次数>放入一个成员变量Treemap

           cleanup方法中——从treemap中挑出次数最高的n个域名作为结果输出

      代码实现

       编码实现: PageCount类开发

    package cn.edu360.mr.page.topn;
    
    public class PageCount implements Comparable<PageCount>{
        
        private String page;
        private int count;
        
        public void set(String page, int count) {
            this.page = page;
            this.count = count;
        }
        
        public String getPage() {
            return page;
        }
        public void setPage(String page) {
            this.page = page;
        }
        public int getCount() {
            return count;
        }
        public void setCount(int count) {
            this.count = count;
        }
    
        @Override
        public int compareTo(PageCount o) {
            
            return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
        }
        
        
    
    }
    PageCount

       编码实现:Mapper类开发

    package cn.edu360.mr.page.topn;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class PageTopnMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(" ");
            context.write(new Text(split[1]), new IntWritable(1));
        }
    
    }
    Mapper

       编码实现:Reduce类开发

    package cn.edu360.mr.page.topn;
    
    import java.io.IOException;
    import java.util.Map.Entry;
    import java.util.Set;
    import java.util.TreeMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class PageTopnReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        
        TreeMap<PageCount, Object> treeMap = new TreeMap<>();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            PageCount pageCount = new PageCount();
            pageCount.set(key.toString(), count);
            
            treeMap.put(pageCount,null);
            
        }
        
        
        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            int topn = conf.getInt("top.n", 5);
            
            
            Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet();
            int i= 0;
            
            for (Entry<PageCount, Object> entry : entrySet) {
                context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount()));
                i++;
                if(i==topn) return;
            }
            
            
        }
        
    
    }
    Reduce

       编码实现: 提交任务类开发

    package cn.edu360.mr.page.topn;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class JobSubmitter {
    
        public static void main(String[] args) throws Exception {
    
            /**
             * 通过加载classpath下的*-site.xml文件解析参数
             */
            Configuration conf = new Configuration();
            conf.addResource("xx-oo.xml");
            
            /**
             * 通过代码设置参数
             */
            //conf.setInt("top.n", 3);
            //conf.setInt("top.n", Integer.parseInt(args[0]));
            
            /**
             * 通过属性配置文件获取参数
             */
            /*Properties props = new Properties();
            props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
            conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));*/
            
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(JobSubmitter.class);
    
            job.setMapperClass(PageTopnMapper.class);
            job.setReducerClass(PageTopnReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\url\input"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\url\output"));
    
            job.waitForCompletion(true);
    
        }
    
    
    
    }
    Submit

    三,MapReduce中的自定义类型

      案例分析

       需求:统计一下文件中,每一个用户所耗费的总上行流量,总下行流量,总流量

       思路:

         map阶段:将每一行按tab切分成各字段,提取其中的手机号作为输出key,流量信息封装到FlowBean对象中,作为输出的value  

         reduce阶段:遍历一组数据的所有value(flowbean),进行累加,然后以手机号作为key输出,以总流量信息bean作为value输出

         注意:自定义类型如何实现hadoop的序列化接口。

         FlowBean这种自定义数据类型必须实现hadoop的序列化接口:Writable,实现其中的两个方法:

         readFields(in)   反序列化方法

         write(out)   序列化方法

      代码实现

       编码实现: PageCount类开发

    package cn.edu360.mr.flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    /**
     * 本案例的功能:演示自定义数据类型如何实现hadoop的序列化接口
     * 1、该类一定要保留空参构造函数
     * 2、write方法中输出字段二进制数据的顺序  要与  readFields方法读取数据的顺序一致
     * 
     * @author ThinkPad
     *
     */
    public class FlowBean implements Writable {
    
        private int upFlow;
        private int dFlow;
        private String phone;
        private int amountFlow;
    
        public FlowBean(){}
        
        public FlowBean(String phone, int upFlow, int dFlow) {
            this.phone = phone;
            this.upFlow = upFlow;
            this.dFlow = dFlow;
            this.amountFlow = upFlow + dFlow;
        }
    
        public String getPhone() {
            return phone;
        }
    
        public void setPhone(String phone) {
            this.phone = phone;
        }
    
        public int getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
    
        public int getdFlow() {
            return dFlow;
        }
    
        public void setdFlow(int dFlow) {
            this.dFlow = dFlow;
        }
    
        public int getAmountFlow() {
            return amountFlow;
        }
    
        public void setAmountFlow(int amountFlow) {
            this.amountFlow = amountFlow;
        }
    
        /**
         * hadoop系统在序列化该类的对象时要调用的方法
         */
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeInt(upFlow);
            out.writeUTF(phone);
            out.writeInt(dFlow);
            out.writeInt(amountFlow);
    
        }
    
        /**
         * hadoop系统在反序列化该类的对象时要调用的方法
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readInt();
            this.phone = in.readUTF();
            this.dFlow = in.readInt();
            this.amountFlow = in.readInt();
        }
    
        @Override
        public String toString() {
             
            return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;
        }
        
    }
    FlowBean

       编码实现:Mapper类开发

    package cn.edu360.mr.flow;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            
            String line = value.toString();
            String[] fields = line.split("	");
            
            String phone = fields[1];
            
            int upFlow = Integer.parseInt(fields[fields.length-3]);
            int dFlow = Integer.parseInt(fields[fields.length-2]);
            
            context.write(new Text(phone), new FlowBean(phone, upFlow, dFlow));
        }
        
    
    }
    Mapper

       编码实现:Reduce类开发

    package cn.edu360.mr.flow;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        
        
        
        /**
         *  key:是某个手机号
         *  values:是这个手机号所产生的所有访问记录中的流量数据
         *  
         *  <135,flowBean1><135,flowBean2><135,flowBean3><135,flowBean4>
         */
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
    
            int upSum = 0;
            int dSum = 0;
            
            for(FlowBean value:values){
                upSum += value.getUpFlow();
                dSum += value.getdFlow();
            }
            
            
            context.write(key, new FlowBean(key.toString(), upSum, dSum));
            
        }
        
    
    }
    Reduce

       编码实现: 提交任务类开发

    package cn.edu360.mr.flow;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class JobSubmitter {
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(JobSubmitter.class);
    
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
            
            // 设置参数:maptask在做数据分区时,用哪个分区逻辑类  (如果不指定,它会用默认的HashPartitioner)
            job.setPartitionerClass(ProvincePartitioner.class);
            // 由于我们的ProvincePartitioner可能会产生6种分区号,所以,需要有6个reduce task来接收
            job.setNumReduceTasks(6);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            
        
    
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\flow\input"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\flow\province-output"));
    
            job.waitForCompletion(true);
    
        }
    
    }
    Submit

    四,MapReduce中的自定义Partitioner

      案例分析

       需求:统计每一个用户的总流量信息,并且按照其归属地,将统计结果输出在不同的文件中

       思路:

       我们知道分区不同对应到的reduce就不同,reduce不同输出的文件就不同,所以只要想办法让mapworker在将数据分区时,按照我们需要的按归属地划分。

       实现方式:自定义一个Partitioner

      代码实现

       编码实现:FlowBean类开发

    package cn.edu360.mr.flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    /**
     * 本案例的功能:演示自定义数据类型如何实现hadoop的序列化接口
     * 1、该类一定要保留空参构造函数
     * 2、write方法中输出字段二进制数据的顺序  要与  readFields方法读取数据的顺序一致
     * 
     * @author ThinkPad
     *
     */
    public class FlowBean implements Writable {
    
        private int upFlow;
        private int dFlow;
        private String phone;
        private int amountFlow;
    
        public FlowBean(){}
        
        public FlowBean(String phone, int upFlow, int dFlow) {
            this.phone = phone;
            this.upFlow = upFlow;
            this.dFlow = dFlow;
            this.amountFlow = upFlow + dFlow;
        }
    
        public String getPhone() {
            return phone;
        }
    
        public void setPhone(String phone) {
            this.phone = phone;
        }
    
        public int getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
    
        public int getdFlow() {
            return dFlow;
        }
    
        public void setdFlow(int dFlow) {
            this.dFlow = dFlow;
        }
    
        public int getAmountFlow() {
            return amountFlow;
        }
    
        public void setAmountFlow(int amountFlow) {
            this.amountFlow = amountFlow;
        }
    
        /**
         * hadoop系统在序列化该类的对象时要调用的方法
         */
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeInt(upFlow);
            out.writeUTF(phone);
            out.writeInt(dFlow);
            out.writeInt(amountFlow);
    
        }
    
        /**
         * hadoop系统在反序列化该类的对象时要调用的方法
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readInt();
            this.phone = in.readUTF();
            this.dFlow = in.readInt();
            this.amountFlow = in.readInt();
        }
    
        @Override
        public String toString() {
             
            return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;
        }
        
    }
    FlowBean

       编码实现:Mapper类开发

    package cn.edu360.mr.flow;
    
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            
            String line = value.toString();
            String[] fields = line.split("	");
            String phone = fields[1];
            int upFlow = Integer.parseInt(fields[fields.length-3]);
            int dFlow = Integer.parseInt(fields[fields.length-2]);
            context.write(new Text(phone), new FlowBean(phone, upFlow, dFlow));
        }
    }
    Mapper

       编码实现:Reduce类开发

    package cn.edu360.mr.flow;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        /**
         *  key:是某个手机号
         *  values:是这个手机号所产生的所有访问记录中的流量数据
         *  
         *  <135,flowBean1><135,flowBean2><135,flowBean3><135,flowBean4>
         */
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            int upSum = 0;
            int dSum = 0;
            for(FlowBean value:values){
                upSum += value.getUpFlow();
                dSum += value.getdFlow();
            }
            context.write(key, new FlowBean(key.toString(), upSum, dSum));
        }
    }
    Reduce

       编码实现:Partitioner类开发

    package cn.edu360.mr.flow;
    
    import java.util.HashMap;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    /**
     * 本类是提供给MapTask用的
     * MapTask通过这个类的getPartition方法,来计算它所产生的每一对kv数据该分发给哪一个reduce task
     * @author ThinkPad
     *
     */
    public class ProvincePartitioner extends Partitioner<Text, FlowBean>{
        static HashMap<String,Integer> codeMap = new HashMap<>();
        static{
            codeMap.put("135", 0);
            codeMap.put("136", 1);
            codeMap.put("137", 2);
            codeMap.put("138", 3);
            codeMap.put("139", 4);
            
        }
        @Override
        public int getPartition(Text key, FlowBean value, int numPartitions) {
            Integer code = codeMap.get(key.toString().substring(0, 3));
            return code==null?5:code;
        }
    }
    Partitioner

       编码实现:Submiit类开发

    package cn.edu360.mr.flow;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class JobSubmitter {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(JobSubmitter.class);
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
            // 设置参数:maptask在做数据分区时,用哪个分区逻辑类  (如果不指定,它会用默认的HashPartitioner)
            job.setPartitionerClass(ProvincePartitioner.class);
            // 由于我们的ProvincePartitioner可能会产生6种分区号,所以,需要有6个reduce task来接收
            job.setNumReduceTasks(6);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\flow\input"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\flow\province-output"));
    
            job.waitForCompletion(true);
    
        }
    }
    Submit

    五,MapReduce中的自定义排序

      案例分析

       需求:统计request.dat中每个页面被访问的总次数,同时,要求输出结果文件中的数据按照次数大小倒序排序

       思路:

         关键点:

         mapreduce程序内置了一个排序机制:

         map worker 和reduce worker ,都会对数据按照key的大小来排序

         所以最终的输出结果中,一定是按照key有顺序的结果

         1、先写一个mr程序,将每个页面的访问总次数统计出来

         2、再写第二个mr程序:

         map阶段:读取第一个mr产生的结果文件,将每一条数据解析成一个java对象UrlCountBean(封装着一个url和它的总次数),然后将这个对象作为key,null作为value返回。

           注意:这个java对象要实现WritableComparable接口,以让worker可以调用对象的compareTo方法来进行排序

         reduce阶段:由于worker已经对收到的数据按照UrlCountBean的compareTo方法排了序,所以,在reduce方法中,只要将数据输出即可,最后的结果自然是按总次数大小的有序结果

      代码实现

       编码实现:PageCount类开发

    package cn.edu360.mr.page.count.sort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class PageCount implements WritableComparable<PageCount>{
        
        private String page;
        private int count;
        
        public void set(String page, int count) {
            this.page = page;
            this.count = count;
        }
        
        public String getPage() {
            return page;
        }
        public void setPage(String page) {
            this.page = page;
        }
        public int getCount() {
            return count;
        }
        public void setCount(int count) {
            this.count = count;
        }
    
        @Override
        public int compareTo(PageCount o) {
            
            return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.page);
            out.writeInt(this.count);
            
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.page= in.readUTF();
            this.count = in.readInt();
            
        }
        
        
        @Override
        public String toString() {
            return this.page + "," + this.count;
        }
        
    
    }
    PageCount

       编码实现:MapRed1类开发

    package cn.edu360.mr.page.count.sort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PageCountStep1 {
        
        public static class PageCountStep1Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
            
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] split = line.split(" ");
                context.write(new Text(split[1]), new IntWritable(1));
            }
            
        }
        
        public static class PageCountStep1Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    
                int count = 0;
                for (IntWritable v : values) {
                    count += v.get();
                }
                
                context.write(key, new IntWritable(count));
                
            }
        }
        
        public static void main(String[] args) throws Exception {
            /**
             * 通过加载classpath下的*-site.xml文件解析参数
             */
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(PageCountStep1.class);
            job.setMapperClass(PageCountStep1Mapper.class);
            job.setReducerClass(PageCountStep1Reducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\url\input"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\url\countout"));
            job.setNumReduceTasks(3);
            job.waitForCompletion(true);
            
        }
    }
    MapReduce1

       编码实现:MapRed2类开发

    package cn.edu360.mr.page.count.sort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class PageCountStep2 {
        public static class PageCountStep2Mapper extends Mapper<LongWritable, Text, PageCount, NullWritable>{
            
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, PageCount, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                
                String[] split = value.toString().split("	");
                PageCount pageCount = new PageCount();
                pageCount.set(split[0], Integer.parseInt(split[1]));
                context.write(pageCount, NullWritable.get());
            }
        }
        
        public static class PageCountStep2Reducer extends Reducer<PageCount, NullWritable, PageCount, NullWritable>{
            @Override
            protected void reduce(PageCount key, Iterable<NullWritable> values,
                    Reducer<PageCount, NullWritable, PageCount, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                context.write(key, NullWritable.get());
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(PageCountStep2.class);
            job.setMapperClass(PageCountStep2Mapper.class);
            job.setReducerClass(PageCountStep2Reducer.class);
            job.setMapOutputKeyClass(PageCount.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(PageCount.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\url\countout"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\url\sortout"));
            
            job.setNumReduceTasks(1);
            job.waitForCompletion(true);
        }
    }
    MapReduce2

    六,MapReduce中的自定义GroupingComparator

      案例分析

       需求:有如下数据(本案例用了排序控制、分区控制、分组控制)

       思路:

         map阶段:读取数据切分字段,封装数据到一个bean中作为key传输,key要按照成交金额比大小

         reduce阶段:利用自定义GroupingComparator将数据按订单id进行分组,然后在reduce方法中输出每组数据的前N条即可

      代码实现

        编码实现:OrderBean类开发

    package cn.edu360.mr.order.topn.grouping;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.io.Serializable;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class OrderBean implements WritableComparable<OrderBean>{
    
        private String orderId;
        private String userId;
        private String pdtName;
        private float price;
        private int number;
        private float amountFee;
    
        public void set(String orderId, String userId, String pdtName, float price, int number) {
            this.orderId = orderId;
            this.userId = userId;
            this.pdtName = pdtName;
            this.price = price;
            this.number = number;
            this.amountFee = price * number;
        }
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public String getPdtName() {
            return pdtName;
        }
    
        public void setPdtName(String pdtName) {
            this.pdtName = pdtName;
        }
    
        public float getPrice() {
            return price;
        }
    
        public void setPrice(float price) {
            this.price = price;
        }
    
        public int getNumber() {
            return number;
        }
    
        public void setNumber(int number) {
            this.number = number;
        }
    
        public float getAmountFee() {
            return amountFee;
        }
    
        public void setAmountFee(float amountFee) {
            this.amountFee = amountFee;
        }
    
        @Override
        public String toString() {
            return this.orderId + "," + this.userId + "," + this.pdtName + "," + this.price + "," + this.number + ","
                    + this.amountFee;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.orderId);
            out.writeUTF(this.userId);
            out.writeUTF(this.pdtName);
            out.writeFloat(this.price);
            out.writeInt(this.number);
    
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.orderId = in.readUTF();
            this.userId = in.readUTF();
            this.pdtName = in.readUTF();
            this.price = in.readFloat();
            this.number = in.readInt();
            this.amountFee = this.price * this.number;
        }
    
        // 比较规则:先比总金额,如果相同,再比商品名称
        @Override
        public int compareTo(OrderBean o) {
            return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getAmountFee(), this.getAmountFee()):this.orderId.compareTo(o.getOrderId());
            
        }
    
    }
    OrderBean

        编码实现:Partitioner类开发

    package cn.edu360.mr.order.topn.grouping;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class OrderIdPartitioner extends Partitioner<OrderBean, NullWritable>{
    
        @Override
        public int getPartition(OrderBean key, NullWritable value, int numPartitions) {
            // 按照订单中的orderid来分发数据
            return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    
    }
    Partitioner

        编码实现:GroupingComparator类开发

    package cn.edu360.mr.order.topn.grouping;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class OrderIdGroupingComparator extends WritableComparator{
        
        public OrderIdGroupingComparator() {
            super(OrderBean.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean o1 = (OrderBean) a;
            OrderBean o2 = (OrderBean) b;
            return o1.getOrderId().compareTo(o2.getOrderId());
        }
    }
    View Code

        编码实现:MapperReduce类开发

    package cn.edu360.mr.order.topn.grouping;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class OrderTopn {
    
        public static class OrderTopnMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
            OrderBean orderBean = new OrderBean();
            NullWritable v = NullWritable.get();
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                
                String[] fields = value.toString().split(",");
                
                orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4]));
                
                context.write(orderBean,v);
            }
        }
        
        public static class OrderTopnReducer extends Reducer< OrderBean, NullWritable,  OrderBean, NullWritable>{
            
            /**
             * 虽然reduce方法中的参数key只有一个,但是只要迭代器迭代一次,key中的值就会变
             */
            @Override
            protected void reduce(OrderBean key, Iterable<NullWritable> values,
                    Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                int i=0;
                for (NullWritable v : values) {
                    context.write(key, v);
                    if(++i==3) return;
                }
            }
        }
        
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration(); // 默认只加载core-default.xml core-site.xml
            conf.setInt("order.top.n", 2);
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(OrderTopn.class);
            job.setMapperClass(OrderTopnMapper.class);
            job.setReducerClass(OrderTopnReducer.class);
            job.setPartitionerClass(OrderIdPartitioner.class);
            job.setGroupingComparatorClass(OrderIdGroupingComparator.class);
            job.setNumReduceTasks(2);
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\order\input"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\order\out-3"));
    
            job.waitForCompletion(true);
        }
    }
    MapReduce

    七,MapReduce中的输入输出控制

      案例分析

       需求:还是对案例3中的流量数据进行汇总,然后求出汇总结果中的流量最大的TOPN

       步骤1

         思路:统计逻辑跟之前的流量统计一致:

         map:以手机号作为keyflowbean作为value

         注:步骤1输出的结果文件通过指定SequenceFileOutputFormat来产生SequenceFile文件;SequenceFile文件是hadoop定义的一种文件,里面存放的是大量key-value的对象序列化字节(文件头部还存放了keyvalue所属的类型名);

       步骤2

         思路:读取步骤1SequenceFile结果文件,需要指定inputformatclassSequenceFileInputFormat组件

         既然使用了这种输入组件,那么我们的map方法中直接就接收一对KEY-VALUE数据

       如何实现topn呢?

       通过把所有汇总数据发给同一个reduce端的worker,并且通过定义一个GroupingComparator来让这个worker把所有手机号的flowbean对象看成一组,调用一次reduce方法,我们就只需要在reduce方法中输出前n个即可

      代码实现

       StepOne

    package cn.edu360.mr.index.sequence;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    
    public class IndexStepOne {
    
        public static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            // 产生 <hello-文件名,1> 
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                // 从输入切片信息中获取当前正在处理的一行数据所属的文件
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                String fileName = inputSplit.getPath().getName();
    
                String[] words = value.toString().split(" ");
                for (String w : words) {
                    // 将"单词-文件名"作为key,1作为value,输出
                    context.write(new Text(w + "-" + fileName), new IntWritable(1));
                }
    
            }
        }
    
        public static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                int count = 0;
                for (IntWritable value : values) {
                    count += value.get();
                }
    
                context.write(key, new IntWritable(count));
            }
        }
        
        
        public static void main(String[] args) throws Exception{
            
            Configuration conf = new Configuration(); 
            
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(IndexStepOne.class);
    
            job.setMapperClass(IndexStepOneMapper.class);
            job.setReducerClass(IndexStepOneReducer.class);
    
            job.setNumReduceTasks(3);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            // job.setOutputFormatClass(TextOutputFormat.class);  // 这是默认的输出组件
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            
    
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\index\input"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\index\out-seq-1"));
    
            job.waitForCompletion(true);
            
        }
    }
    StepOne

       StepTwo

    package cn.edu360.mr.index.sequence;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class IndexStepTwo {
    
        public static class IndexStepTwoMapper extends Mapper<Text, IntWritable, Text, Text> {
    
            @Override
            protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
                String[] split = key.toString().split("-");
                context.write(new Text(split[0]), new Text(split[1]+"-->"+value));
            }
        }
    
        public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text> {
    
            // 一组数据:  <hello,a.txt-->4> <hello,b.txt-->4> <hello,c.txt-->4>
            @Override
            protected void reduce(Text key, Iterable<Text> values,Context context)
                    throws IOException, InterruptedException {
                // stringbuffer是线程安全的,stringbuilder是非线程安全的,在不涉及线程安全的场景下,stringbuilder更快
                StringBuilder sb = new StringBuilder();
                
                for (Text value : values) {
                    sb.append(value.toString()).append("	");
                }
                
                context.write(key, new Text(sb.toString()));
            }
        }
        
        
        public static void main(String[] args) throws Exception{
            
            Configuration conf = new Configuration(); // 默认只加载core-default.xml core-site.xml
            
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(IndexStepTwo.class);
    
            job.setMapperClass(IndexStepTwoMapper.class);
            job.setReducerClass(IndexStepTwoReducer.class);
    
            job.setNumReduceTasks(1);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            // job.setInputFormatClass(TextInputFormat.class); 默认的输入组件
            job.setInputFormatClass(SequenceFileInputFormat.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.setInputPaths(job, new Path("F:\mrdata\index\out1"));
            FileOutputFormat.setOutputPath(job, new Path("F:\mrdata\index\out2"));
    
            job.waitForCompletion(true);
            
        }
    }
    StepTwo

    八,案例所需文档

    点击下载

  • 相关阅读:
    jquery easyui-datagrid手动增加删除重置行
    jsp中一行多条数据情况
    JQuery操作下拉框
    解决juqery easyui combobox只能选择问题
    oracle中WMSYS.WM_CONCAT函数的版本差异
    oracle wm_concat(column)函数的使用
    Javascript九大排序算法详解
    C#和VB新版本的最新特性列表
    Oracle中如何区别用户和模式
    远程控制数据库实用SQL重启功能
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10837325.html
Copyright © 2011-2022 走看看