zoukankan      html  css  js  c++  java
  • 大数据入门第九天——MapReduce详解(五)mapJoin、GroupingComparator与更多MR实例

    一、数据倾斜分析——mapJoin

      1.背景

        接上一个day的Join算法,我们的解决join的方式是:在reduce端通过pid进行串接,这样的话:

    --order
    1001,20150710,P0001,2
    1002,20150710,P0001,3
    1002,20150710,P0002,3
    --product
    P0001,小米5,1000,2
    P0002,锤子T1,1000,3

        例如订单中的小米5卖的比较好(截止博客时间,已经是米7将出的时候了。),这样的话大部分的数据都流向了P0001的这个reduce上,而P0002

    的锤子的reduce确很轻松,这样,就产生了数据倾斜了!

        更多的数据倾斜的介绍,参考http://blog.csdn.net/u010039929/article/details/55044407

        我们这里用的是比较简单的map端join!也就是不需要通过reduce来串接了。具体来说就是在map端就直接拼接好,不需要reduce来拼接;那我们就需要在map的阶段进行join连接,也就是map端就需要能够连接,那就是产品全表(字典表)需要在map端就有这个字典表,放在内存而不放在输入文件。这里

    mapreduce给我们提供了一个很棒的解决方案:DistributedCache,了解这个,可以参考http://blog.csdn.net/lzm1340458776/article/details/42971075

        相关的分布式缓存的用法,参考http://blog.csdn.net/qq1010885678/article/details/50751007

        当然,首先应当查看的,应该是官方文档的介绍:点击查看

      2.代码

    package com.mr.mapjoin;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/5
     **/
    public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Map<String, String> infoMap = new HashMap<>();
        Text k = new Text();
        /**
         * 启动之前进行一些必要的初始化工作
         * @param context 上下文
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt")));
            String path = context.getCacheFiles()[0].getPath();
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
            String line;
            while (StringUtils.isNotEmpty(line = br.readLine())) {
                String[] fields = line.split(",");
                // 将字典加载进入map
                infoMap.put(fields[0], fields[1]);
            }
            // 关闭流
            br.close();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String orderLine = value.toString();
            // 切分订单信息
            String[] fields = orderLine.split("	");
            String pName = infoMap.get(fields[1]);
            k.set(orderLine + "	" + pName);
            // 写出去
            context.write(k, NullWritable.get());
        }
    }
    MapJoinMapper
    package com.mr.mapjoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.net.URI;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/5
     **/
    public class MapJoinDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            // 初始化Job
            Job job = Job.getInstance(conf);
            // job相关配置
            job.setJarByClass(MapJoinDriver.class);
            job.setMapperClass(MapJoinMapper.class);
            // 这里直接省略reduce阶段(map端已经完成)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 指定缓存
            /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
            /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
            /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
            /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录
            job.addCacheFile(new URI("file:/F:/pd.txt"));
            // 指定不需要reduce
            job.setNumReduceTasks(0);
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    MapJoinDriver

       更多,待补充

    二、倒排索引建立

       1.需求     

          需求:有大量的文本(文档、网页),需要建立搜索索引

         

          // 对比与wordcount的不同

         2.思路

          把单词和文件作为key

        3.代码

    package com.mr.inverseindex;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/5
     **/
    public class InverseIndexMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        Text k = new Text();
        IntWritable v = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(" ");
            // 得到文件名
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            for (String field : fields) {
                k.set(field + "--" + fileName);
                context.write(k, v);
            }
        }
    }
    InverseIndexMapper
    package com.mr.inverseindex;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer
     *
     * @author zcc ON 2018/2/5
     **/
    public class InverseIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        IntWritable v = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            v.set(count);
            context.write(key, v);
        }
    }
    InverseIndexReducer
    package com.mr.inverseindex;
    
    import com.mr.wordcount.WordCountDriver;
    import com.mr.wordcount.WordCountMapper;
    import com.mr.wordcount.WordCountReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/5
     **/
    public class InverseIndexDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(InverseIndexDriver.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(InverseIndexMapper.class);
            job.setReducerClass(InverseIndexReducer.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    InverseIndexDriver

        输出结果:

    hello--a.txt    3
    hello--b.txt    2
    hello--c.txt    2
    jerry--a.txt    1
    jerry--b.txt    3
    jerry--c.txt    1
    tom--a.txt    2
    tom--b.txt    1
    tom--c.txt    1

        可以看到,很多时候是会出现一次无法解决的情况,需要配合多次mapreduce配合!

       再次在结果上执行mapreduce:

    package cn.itcast.bigdata.mr.inverindex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class IndexStepTwo {
        public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] files = line.split("--");
                context.write(new Text(files[0]), new Text(files[1]));
            }
        }
        public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                StringBuffer sb = new StringBuffer();
                for (Text text : values) {
                    sb.append(text.toString().replace("	", "-->") + "	");
                }
                context.write(key, new Text(sb.toString()));
            }
        }
        public static void main(String[] args) throws Exception {
            
            if (args.length < 1 || args == null) {
                args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};
            }
            
            Configuration config = new Configuration();
            Job job = Job.getInstance(config);
            
            job.setMapperClass(IndexStepTwoMapper.class);
            job.setReducerClass(IndexStepTwoReducer.class);
    //        job.setMapOutputKeyClass(Text.class);
    //        job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 1:0);
        }
    }
    IndexStepTwo

       所以,总结诸多示例来说,使用Mapreduce的最大的关键是确定什么作为key,因为key相同的会归并到reduce进行处理,接下来的示例也都是这个思路!

    三、寻找共同好友

      1.需求   

        以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J

       2.分析

        前面已经提到,遇到mapreduce问题,有时候一步不好解决,可以逐步逼近,多次求解!

    #1.1求出哪些人都有好友c,也就是求出c是哪些人的共同好友
    c --> a    b d f g
        #2.1得到有关c的共同好友的关系
        a-b c
        a-d c
        a-g c
    #1.2同理,求出d是哪些人的共同好友
    d --> a e g h
        #2.2同理,得到有关d的共同好友的关系
        a-e d
        a-g d
    
    #3对第二步结果再进行mapreduce,则得到例如a-g c d这样的共同好友列表了!

      3.阶段一代码: 

    package com.mr.fans;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(":");
            // 切割用户和好友 A:B,C,D
            String person = fields[0];
            String[] friends = fields[1].split(",");
            for (String friend : friends) {
                k.set(friend);
                v.set(person);
                // 输出K-V对,<好友,用户>
                context.write(k, v);
            }
        }
    }
    SharedFriendsMapper
    package com.mr.fans;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsReducer extends Reducer<Text, Text, Text, Text> {
        Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text value : values) {
                sb.append(value).append("-");
            }
            // 去除最后一个字符(,)
            sb.deleteCharAt(sb.length() - 1);
            v.set(sb.toString());
            context.write(key, v);
        }
    }
    SharedFriendsReducer
    package com.mr.fans;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsDriver {
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(SharedFriendsDriver.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(SharedFriendsMapper.class);
            job.setReducerClass(SharedFriendsReducer.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 :1);
        }
    }
    SharedFriendsDriver

      阶段一结果:

    A    I-K-C-B-G-F-H-O-D
    B    A-F-J-E
    C    A-E-B-H-F-G-K
    D    G-C-K-A-L-F-E-H
    E    G-M-L-H-A-F-B-D
    F    L-M-D-C-G-A
    G    M
    H    O
    I    O-C
    J    O
    K    B
    L    D-E
    M    E-F
    O    A-H-I-J-F

       这里提一下,经过查证,输出的k,v之间的默认分隔符是“ ”,我们也可以自己定义:

    conf.set("mapred.textoutputformat.separator", ";");

        更多自定义分隔符,参考http://www.xuebuyuan.com/2132293.html

       4.阶段二代码 

    package com.mr.fans;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    /**
     * mapper step2
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsMapper2 extends Mapper<LongWritable, Text, Text, Text>{
        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // A    B-C-D-E-F(默认分隔符	)
            String line = value.toString();
            String[] fields = line.split("	");
            // 切割用户和好友
            String friend = fields[0];
            String[] persons = fields[1].split("-");
            // 使用工具排序
            Arrays.sort(persons);
            for (int i = 0; i < persons.length - 2; i++) {
                for (int j = i + 1; j < persons.length -1; j++) {
                    k.set(persons[i] + "-" + persons[j]);
                    v.set(friend);
                    // 将B-C A进行写出!,这样相同的X-Y的对会归集到同一个reduce
                    context.write(k, v);
                }
            }
        }
    }
    SharedFriendsMapper2
    package com.mr.fans;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer step2
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsReducer2 extends Reducer<Text, Text, Text, Text> {
        Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text value : values) {
                sb.append(value).append(",");
            }
            // 去除最后一个字符(,)
            sb.deleteCharAt(sb.length() - 1);
            v.set(sb.toString());
            context.write(key, v);
        }
    }
    SharedFriendsReducer2
    package com.mr.fans;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * step2
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsDriver2 {
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(SharedFriendsDriver2.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(SharedFriendsMapper2.class);
            job.setReducerClass(SharedFriendsReducer2.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
            FileInputFormat.setInputPaths(job, new Path("F:/output"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output2"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 :1);
        }
    }
    SharedFriendsDriver2

      部分结果示例:  

    A-B    C,E
    A-C    F,D
    A-D    E,F
    A-E    B,C,D
    A-F    C,D,B,E,O
    A-G    D,E,F,C
    A-H    E,O,C,D
    A-I    O
    A-K    D
    A-L    F,E

    四、web日志预处理

      1.需求:

        对web访问日志中的各字段识别切分

        去除日志中不合法的记录

        根据KPI统计需求,生成各类访问请求过滤数据

      部分日志示例:

    194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
    183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
    163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    View Code

         总的来说,就是根据业务逻辑进行日志的清洗

       2.代码

        这里暂不深入了。给出核心代码: 

    public class WebLogBean {
        
        private String remote_addr;// 记录客户端的ip地址
        private String remote_user;// 记录客户端用户名称,忽略属性"-"
        private String time_local;// 记录访问时间与时区
        private String request;// 记录请求的url与http协议
        private String status;// 记录请求状态;成功是200
        private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
        private String http_referer;// 用来记录从那个页面链接访问过来的
        private String http_user_agent;// 记录客户浏览器的相关信息
    
        private boolean valid = true;// 判断数据是否合法
    
        
        
        public String getRemote_addr() {
            return remote_addr;
        }
    
        public void setRemote_addr(String remote_addr) {
            this.remote_addr = remote_addr;
        }
    
        public String getRemote_user() {
            return remote_user;
        }
    
        public void setRemote_user(String remote_user) {
            this.remote_user = remote_user;
        }
    
        public String getTime_local() {
            return time_local;
        }
    
        public void setTime_local(String time_local) {
            this.time_local = time_local;
        }
    
        public String getRequest() {
            return request;
        }
    
        public void setRequest(String request) {
            this.request = request;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getBody_bytes_sent() {
            return body_bytes_sent;
        }
    
        public void setBody_bytes_sent(String body_bytes_sent) {
            this.body_bytes_sent = body_bytes_sent;
        }
    
        public String getHttp_referer() {
            return http_referer;
        }
    
        public void setHttp_referer(String http_referer) {
            this.http_referer = http_referer;
        }
    
        public String getHttp_user_agent() {
            return http_user_agent;
        }
    
        public void setHttp_user_agent(String http_user_agent) {
            this.http_user_agent = http_user_agent;
        }
    
        public boolean isValid() {
            return valid;
        }
    
        public void setValid(boolean valid) {
            this.valid = valid;
        }
        
        
        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.valid);
            sb.append("01").append(this.remote_addr);
            sb.append("01").append(this.remote_user);
            sb.append("01").append(this.time_local);
            sb.append("01").append(this.request);
            sb.append("01").append(this.status);
            sb.append("01").append(this.body_bytes_sent);
            sb.append("01").append(this.http_referer);
            sb.append("01").append(this.http_user_agent);
            return sb.toString();
    }
    }
    日志Bean
    public class WebLogParser {
        public static WebLogBean parser(String line) {
            WebLogBean webLogBean = new WebLogBean();
            String[] arr = line.split(" ");
            if (arr.length > 11) {
                webLogBean.setRemote_addr(arr[0]);
                webLogBean.setRemote_user(arr[1]);
                webLogBean.setTime_local(arr[3].substring(1));
                webLogBean.setRequest(arr[6]);
                webLogBean.setStatus(arr[8]);
                webLogBean.setBody_bytes_sent(arr[9]);
                webLogBean.setHttp_referer(arr[10]);
                
                if (arr.length > 12) {
                    webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);
                } else {
                    webLogBean.setHttp_user_agent(arr[11]);
                }
                if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
                    webLogBean.setValid(false);
                }
            } else {
                webLogBean.setValid(false);
            }
            return webLogBean;
        }
       
        public static String parserTime(String time) {
            
            time.replace("/", "-");
            return time;
            
        }
    }
    原始日志处理类Preser
    public class WeblogPreProcess {
    
        static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            Text k = new Text();
            NullWritable v = NullWritable.get();
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
                WebLogBean webLogBean = WebLogParser.parser(line);
                if (!webLogBean.isValid())
                    return;
                k.set(webLogBean.toString());
                context.write(k, v);
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(WeblogPreProcess.class);
            
            job.setMapperClass(WeblogPreProcessMapper.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.waitForCompletion(true);
            
        }
    }
    mapreduce程序

         字符串、日期时间等的处理需要加强!

    五、自定义GroupingComparator

      1.需求

        有如下订单:

        

        现在需要求出每一个订单中成交金额最大的一笔交易

      2.分析   

          1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

        2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

      注意,这不是一种常规的做法,只是我们刚好利用了它的一种机制!

       默认的分组WritableComparator是通过文本的内容是否相同来决定是否是同一个Key,从而在reduce进行分组,我们只需要修改这个,即可!

       3.代码

      核心代码:

        比较器:这里我们选择继承它的一个通用实现:WritableComparator

    public class OrderComparator extends WritableComparator{
        protected OrderComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // 强转为子类
            OrderBean bean1 = (OrderBean) a;
            OrderBean bean2 = (OrderBean) b;
            // 将ID是否相同视为一组
            return bean1.getItemId().compareTo(bean2.getItemId());
        }
    }
    /** 
     * 自定义分组比较器 
     * @author 廖*民 
     * time : 2015年1月18日下午9:15:26 
     * @version 
     */  
    class MyGroupComparator implements RawComparator<CombineKey> {  
      
        // 分组策略中,这个方法不是重点  
        public int compare(CombineKey o1, CombineKey o2) {  
            // TODO Auto-generated method stub  
            return 0;  
        }  
      
        /** 
         * b1 表示第一个参与比较的字节数组 
         * s1 表示第一个字节数组中开始比较的位置  
         * l1 表示第一个字节数组中参与比较的字节长度  
         * b2 表示第二个参与比较的字节数组  
         * s2 表示第二个字节数组中开始比较的位置  
         * l2 表示第二个字节数组参与比较的字节长度 
         */  
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
      
            // 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节  
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);  
        }  
      
    直接实现RawComparator

        OrderBean的比较逻辑:

      @Override
        public int compareTo(OrderBean o) {
            // 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
            int cmp = this.getItemId().compareTo(o.getItemId());
            if (0 == cmp) {
                return -Double.compare(this.getAmount(), o.getAmount());
            }
            return cmp;
        }

        job中进行设置分组器:

         // 关键步骤,设置分组器
            job.setGroupingComparatorClass(OrderComparator.class);

      完整代码:

    package com.mr.group;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/6
     **/
    public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
        OrderBean bean = new OrderBean();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(",");
            String itemId = fields[0];
            double amount = Double.parseDouble(fields[2]);
            bean.set(itemId, amount);
            context.write(bean, NullWritable.get());
        }
    }
    GroupMapper
    package com.mr.group;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer
     *
     * @author zcc ON 2018/2/6
     **/
    public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            // 设置了自定义分组策略后,到这里就是ID相同则为一组了,取出第一个key则为结果
            context.write(key, NullWritable.get());
        }
    }
    GroupReducer
    package com.mr.group;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * compator
     *
     * @author zcc ON 2018/2/6
     **/
    public class OrderComparator extends WritableComparator{
        protected OrderComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // 强转为子类
            OrderBean bean1 = (OrderBean) a;
            OrderBean bean2 = (OrderBean) b;
            // 将ID是否相同视为一组
            return bean1.getItemId().compareTo(bean2.getItemId());
        }
    }
    OrderComparator
    package com.mr.group;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * bean
     *
     * @author zcc ON 2018/2/6
     **/
    public class OrderBean implements WritableComparable<OrderBean>{
        private String itemId;
        private Double amount;
    
        public OrderBean() {
        }
    
        public OrderBean(String itemId, Double amount) {
            this.itemId = itemId;
            this.amount = amount;
        }
        public void set(String itemId, Double amount) {
            this.itemId = itemId;
            this.amount = amount;
        }
        public String getItemId() {
            return itemId;
        }
    
        public void setItemId(String itemId) {
            this.itemId = itemId;
        }
    
        public Double getAmount() {
            return amount;
        }
    
        public void setAmount(Double amount) {
            this.amount = amount;
        }
    
        @Override
        public int compareTo(OrderBean o) {
            // 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
            int cmp = this.getItemId().compareTo(o.getItemId());
            if (0 == cmp) {
                return -Double.compare(this.getAmount(), o.getAmount());
            }
            return cmp;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(itemId);
            out.writeDouble(amount);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.itemId = in.readUTF();
            this.amount = in.readDouble();
        }
    
        @Override
        public String toString() {
            return itemId + "	" + amount;
        }
    }
    OrderBean
    package com.mr.group;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/6
     **/
    public class GroupDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(GroupDriver.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(GroupMapper.class);
            job.setReducerClass(GroupReducer.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            // 关键步骤,设置分组器
            job.setGroupingComparatorClass(OrderComparator.class);
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    GroupDriver
  • 相关阅读:
    (网页)中的简单的遮罩层
    (后端)shiro:Wildcard string cannot be null or empty. Make sure permission strings are properly formatted.
    (网页)jQuery的时间datetime控件在AngularJs中使用实例
    Maven Myeclipse 搭建项目
    MyBatis 环境搭建 (一)
    java 常用方法
    XML 基础
    JS BOM
    js 事件
    js 的使用原则
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8418169.html
Copyright © 2011-2022 走看看