zoukankan      html  css  js  c++  java
  • 大数据入门第九天——MapReduce详解(五)mapJoin、GroupingComparator与更多MR实例

    一、数据倾斜分析——mapJoin

      1.背景

        接上一个day的Join算法,我们的解决join的方式是:在reduce端通过pid进行串接,这样的话:

    --order
    1001,20150710,P0001,2
    1002,20150710,P0001,3
    1002,20150710,P0002,3
    --product
    P0001,小米5,1000,2
    P0002,锤子T1,1000,3

        例如订单中的小米5卖的比较好(截止博客时间,已经是米7将出的时候了。),这样的话大部分的数据都流向了P0001的这个reduce上,而P0002

    的锤子的reduce确很轻松,这样,就产生了数据倾斜了!

        更多的数据倾斜的介绍,参考http://blog.csdn.net/u010039929/article/details/55044407

        我们这里用的是比较简单的map端join!也就是不需要通过reduce来串接了。具体来说就是在map端就直接拼接好,不需要reduce来拼接;那我们就需要在map的阶段进行join连接,也就是map端就需要能够连接,那就是产品全表(字典表)需要在map端就有这个字典表,放在内存而不放在输入文件。这里

    mapreduce给我们提供了一个很棒的解决方案:DistributedCache,了解这个,可以参考http://blog.csdn.net/lzm1340458776/article/details/42971075

        相关的分布式缓存的用法,参考http://blog.csdn.net/qq1010885678/article/details/50751007

        当然,首先应当查看的,应该是官方文档的介绍:点击查看

      2.代码

    package com.mr.mapjoin;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/5
     **/
    public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Map<String, String> infoMap = new HashMap<>();
        Text k = new Text();
        /**
         * 启动之前进行一些必要的初始化工作
         * @param context 上下文
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt")));
            String path = context.getCacheFiles()[0].getPath();
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
            String line;
            while (StringUtils.isNotEmpty(line = br.readLine())) {
                String[] fields = line.split(",");
                // 将字典加载进入map
                infoMap.put(fields[0], fields[1]);
            }
            // 关闭流
            br.close();
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String orderLine = value.toString();
            // 切分订单信息
            String[] fields = orderLine.split("	");
            String pName = infoMap.get(fields[1]);
            k.set(orderLine + "	" + pName);
            // 写出去
            context.write(k, NullWritable.get());
        }
    }
    MapJoinMapper
    package com.mr.mapjoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.net.URI;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/5
     **/
    public class MapJoinDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            // 初始化Job
            Job job = Job.getInstance(conf);
            // job相关配置
            job.setJarByClass(MapJoinDriver.class);
            job.setMapperClass(MapJoinMapper.class);
            // 这里直接省略reduce阶段(map端已经完成)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 指定缓存
            /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
            /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
            /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
            /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录
            job.addCacheFile(new URI("file:/F:/pd.txt"));
            // 指定不需要reduce
            job.setNumReduceTasks(0);
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    MapJoinDriver

       更多,待补充

    二、倒排索引建立

       1.需求     

          需求:有大量的文本(文档、网页),需要建立搜索索引

         

          // 对比与wordcount的不同

         2.思路

          把单词和文件作为key

        3.代码

    package com.mr.inverseindex;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/5
     **/
    public class InverseIndexMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        Text k = new Text();
        IntWritable v = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(" ");
            // 得到文件名
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            for (String field : fields) {
                k.set(field + "--" + fileName);
                context.write(k, v);
            }
        }
    }
    InverseIndexMapper
    package com.mr.inverseindex;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer
     *
     * @author zcc ON 2018/2/5
     **/
    public class InverseIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        IntWritable v = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            v.set(count);
            context.write(key, v);
        }
    }
    InverseIndexReducer
    package com.mr.inverseindex;
    
    import com.mr.wordcount.WordCountDriver;
    import com.mr.wordcount.WordCountMapper;
    import com.mr.wordcount.WordCountReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/5
     **/
    public class InverseIndexDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(InverseIndexDriver.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(InverseIndexMapper.class);
            job.setReducerClass(InverseIndexReducer.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    InverseIndexDriver

        输出结果:

    hello--a.txt    3
    hello--b.txt    2
    hello--c.txt    2
    jerry--a.txt    1
    jerry--b.txt    3
    jerry--c.txt    1
    tom--a.txt    2
    tom--b.txt    1
    tom--c.txt    1

        可以看到,很多时候是会出现一次无法解决的情况,需要配合多次mapreduce配合!

       再次在结果上执行mapreduce:

    package cn.itcast.bigdata.mr.inverindex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class IndexStepTwo {
        public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] files = line.split("--");
                context.write(new Text(files[0]), new Text(files[1]));
            }
        }
        public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                StringBuffer sb = new StringBuffer();
                for (Text text : values) {
                    sb.append(text.toString().replace("	", "-->") + "	");
                }
                context.write(key, new Text(sb.toString()));
            }
        }
        public static void main(String[] args) throws Exception {
            
            if (args.length < 1 || args == null) {
                args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};
            }
            
            Configuration config = new Configuration();
            Job job = Job.getInstance(config);
            
            job.setMapperClass(IndexStepTwoMapper.class);
            job.setReducerClass(IndexStepTwoReducer.class);
    //        job.setMapOutputKeyClass(Text.class);
    //        job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 1:0);
        }
    }
    IndexStepTwo

       所以,总结诸多示例来说,使用Mapreduce的最大的关键是确定什么作为key,因为key相同的会归并到reduce进行处理,接下来的示例也都是这个思路!

    三、寻找共同好友

      1.需求   

        以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J

       2.分析

        前面已经提到,遇到mapreduce问题,有时候一步不好解决,可以逐步逼近,多次求解!

    #1.1求出哪些人都有好友c,也就是求出c是哪些人的共同好友
    c --> a    b d f g
        #2.1得到有关c的共同好友的关系
        a-b c
        a-d c
        a-g c
    #1.2同理,求出d是哪些人的共同好友
    d --> a e g h
        #2.2同理,得到有关d的共同好友的关系
        a-e d
        a-g d
    
    #3对第二步结果再进行mapreduce,则得到例如a-g c d这样的共同好友列表了!

      3.阶段一代码: 

    package com.mr.fans;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(":");
            // 切割用户和好友 A:B,C,D
            String person = fields[0];
            String[] friends = fields[1].split(",");
            for (String friend : friends) {
                k.set(friend);
                v.set(person);
                // 输出K-V对,<好友,用户>
                context.write(k, v);
            }
        }
    }
    SharedFriendsMapper
    package com.mr.fans;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsReducer extends Reducer<Text, Text, Text, Text> {
        Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text value : values) {
                sb.append(value).append("-");
            }
            // 去除最后一个字符(,)
            sb.deleteCharAt(sb.length() - 1);
            v.set(sb.toString());
            context.write(key, v);
        }
    }
    SharedFriendsReducer
    package com.mr.fans;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsDriver {
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(SharedFriendsDriver.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(SharedFriendsMapper.class);
            job.setReducerClass(SharedFriendsReducer.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 :1);
        }
    }
    SharedFriendsDriver

      阶段一结果:

    A    I-K-C-B-G-F-H-O-D
    B    A-F-J-E
    C    A-E-B-H-F-G-K
    D    G-C-K-A-L-F-E-H
    E    G-M-L-H-A-F-B-D
    F    L-M-D-C-G-A
    G    M
    H    O
    I    O-C
    J    O
    K    B
    L    D-E
    M    E-F
    O    A-H-I-J-F

       这里提一下,经过查证,输出的k,v之间的默认分隔符是“ ”,我们也可以自己定义:

    conf.set("mapred.textoutputformat.separator", ";");

        更多自定义分隔符,参考http://www.xuebuyuan.com/2132293.html

       4.阶段二代码 

    package com.mr.fans;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    /**
     * mapper step2
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsMapper2 extends Mapper<LongWritable, Text, Text, Text>{
        Text k = new Text();
        Text v = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // A    B-C-D-E-F(默认分隔符	)
            String line = value.toString();
            String[] fields = line.split("	");
            // 切割用户和好友
            String friend = fields[0];
            String[] persons = fields[1].split("-");
            // 使用工具排序
            Arrays.sort(persons);
            for (int i = 0; i < persons.length - 2; i++) {
                for (int j = i + 1; j < persons.length -1; j++) {
                    k.set(persons[i] + "-" + persons[j]);
                    v.set(friend);
                    // 将B-C A进行写出!,这样相同的X-Y的对会归集到同一个reduce
                    context.write(k, v);
                }
            }
        }
    }
    SharedFriendsMapper2
    package com.mr.fans;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer step2
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsReducer2 extends Reducer<Text, Text, Text, Text> {
        Text v = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text value : values) {
                sb.append(value).append(",");
            }
            // 去除最后一个字符(,)
            sb.deleteCharAt(sb.length() - 1);
            v.set(sb.toString());
            context.write(key, v);
        }
    }
    SharedFriendsReducer2
    package com.mr.fans;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * step2
     *
     * @author zcc ON 2018/2/6
     **/
    public class SharedFriendsDriver2 {
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(SharedFriendsDriver2.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(SharedFriendsMapper2.class);
            job.setReducerClass(SharedFriendsReducer2.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
            FileInputFormat.setInputPaths(job, new Path("F:/output"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output2"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 :1);
        }
    }
    SharedFriendsDriver2

      部分结果示例:  

    A-B    C,E
    A-C    F,D
    A-D    E,F
    A-E    B,C,D
    A-F    C,D,B,E,O
    A-G    D,E,F,C
    A-H    E,O,C,D
    A-I    O
    A-K    D
    A-L    F,E

    四、web日志预处理

      1.需求:

        对web访问日志中的各字段识别切分

        去除日志中不合法的记录

        根据KPI统计需求,生成各类访问请求过滤数据

      部分日志示例:

    194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
    183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
    163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
    View Code

         总的来说,就是根据业务逻辑进行日志的清洗

       2.代码

        这里暂不深入了。给出核心代码: 

    public class WebLogBean {
        
        private String remote_addr;// 记录客户端的ip地址
        private String remote_user;// 记录客户端用户名称,忽略属性"-"
        private String time_local;// 记录访问时间与时区
        private String request;// 记录请求的url与http协议
        private String status;// 记录请求状态;成功是200
        private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
        private String http_referer;// 用来记录从那个页面链接访问过来的
        private String http_user_agent;// 记录客户浏览器的相关信息
    
        private boolean valid = true;// 判断数据是否合法
    
        
        
        public String getRemote_addr() {
            return remote_addr;
        }
    
        public void setRemote_addr(String remote_addr) {
            this.remote_addr = remote_addr;
        }
    
        public String getRemote_user() {
            return remote_user;
        }
    
        public void setRemote_user(String remote_user) {
            this.remote_user = remote_user;
        }
    
        public String getTime_local() {
            return time_local;
        }
    
        public void setTime_local(String time_local) {
            this.time_local = time_local;
        }
    
        public String getRequest() {
            return request;
        }
    
        public void setRequest(String request) {
            this.request = request;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getBody_bytes_sent() {
            return body_bytes_sent;
        }
    
        public void setBody_bytes_sent(String body_bytes_sent) {
            this.body_bytes_sent = body_bytes_sent;
        }
    
        public String getHttp_referer() {
            return http_referer;
        }
    
        public void setHttp_referer(String http_referer) {
            this.http_referer = http_referer;
        }
    
        public String getHttp_user_agent() {
            return http_user_agent;
        }
    
        public void setHttp_user_agent(String http_user_agent) {
            this.http_user_agent = http_user_agent;
        }
    
        public boolean isValid() {
            return valid;
        }
    
        public void setValid(boolean valid) {
            this.valid = valid;
        }
        
        
        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.valid);
            sb.append("01").append(this.remote_addr);
            sb.append("01").append(this.remote_user);
            sb.append("01").append(this.time_local);
            sb.append("01").append(this.request);
            sb.append("01").append(this.status);
            sb.append("01").append(this.body_bytes_sent);
            sb.append("01").append(this.http_referer);
            sb.append("01").append(this.http_user_agent);
            return sb.toString();
    }
    }
    日志Bean
    public class WebLogParser {
        public static WebLogBean parser(String line) {
            WebLogBean webLogBean = new WebLogBean();
            String[] arr = line.split(" ");
            if (arr.length > 11) {
                webLogBean.setRemote_addr(arr[0]);
                webLogBean.setRemote_user(arr[1]);
                webLogBean.setTime_local(arr[3].substring(1));
                webLogBean.setRequest(arr[6]);
                webLogBean.setStatus(arr[8]);
                webLogBean.setBody_bytes_sent(arr[9]);
                webLogBean.setHttp_referer(arr[10]);
                
                if (arr.length > 12) {
                    webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);
                } else {
                    webLogBean.setHttp_user_agent(arr[11]);
                }
                if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
                    webLogBean.setValid(false);
                }
            } else {
                webLogBean.setValid(false);
            }
            return webLogBean;
        }
       
        public static String parserTime(String time) {
            
            time.replace("/", "-");
            return time;
            
        }
    }
    原始日志处理类Preser
    public class WeblogPreProcess {
    
        static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            Text k = new Text();
            NullWritable v = NullWritable.get();
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
                WebLogBean webLogBean = WebLogParser.parser(line);
                if (!webLogBean.isValid())
                    return;
                k.set(webLogBean.toString());
                context.write(k, v);
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(WeblogPreProcess.class);
            
            job.setMapperClass(WeblogPreProcessMapper.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.waitForCompletion(true);
            
        }
    }
    mapreduce程序

         字符串、日期时间等的处理需要加强!

    五、自定义GroupingComparator

      1.需求

        有如下订单:

        

        现在需要求出每一个订单中成交金额最大的一笔交易

      2.分析   

          1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

        2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

      注意,这不是一种常规的做法,只是我们刚好利用了它的一种机制!

       默认的分组WritableComparator是通过文本的内容是否相同来决定是否是同一个Key,从而在reduce进行分组,我们只需要修改这个,即可!

       3.代码

      核心代码:

        比较器:这里我们选择继承它的一个通用实现:WritableComparator

    public class OrderComparator extends WritableComparator{
        protected OrderComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // 强转为子类
            OrderBean bean1 = (OrderBean) a;
            OrderBean bean2 = (OrderBean) b;
            // 将ID是否相同视为一组
            return bean1.getItemId().compareTo(bean2.getItemId());
        }
    }
    /** 
     * 自定义分组比较器 
     * @author 廖*民 
     * time : 2015年1月18日下午9:15:26 
     * @version 
     */  
    class MyGroupComparator implements RawComparator<CombineKey> {  
      
        // 分组策略中,这个方法不是重点  
        public int compare(CombineKey o1, CombineKey o2) {  
            // TODO Auto-generated method stub  
            return 0;  
        }  
      
        /** 
         * b1 表示第一个参与比较的字节数组 
         * s1 表示第一个字节数组中开始比较的位置  
         * l1 表示第一个字节数组中参与比较的字节长度  
         * b2 表示第二个参与比较的字节数组  
         * s2 表示第二个字节数组中开始比较的位置  
         * l2 表示第二个字节数组参与比较的字节长度 
         */  
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
      
            // 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节  
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);  
        }  
      
    直接实现RawComparator

        OrderBean的比较逻辑:

      @Override
        public int compareTo(OrderBean o) {
            // 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
            int cmp = this.getItemId().compareTo(o.getItemId());
            if (0 == cmp) {
                return -Double.compare(this.getAmount(), o.getAmount());
            }
            return cmp;
        }

        job中进行设置分组器:

         // 关键步骤,设置分组器
            job.setGroupingComparatorClass(OrderComparator.class);

      完整代码:

    package com.mr.group;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * mapper
     *
     * @author zcc ON 2018/2/6
     **/
    public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
        OrderBean bean = new OrderBean();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(",");
            String itemId = fields[0];
            double amount = Double.parseDouble(fields[2]);
            bean.set(itemId, amount);
            context.write(bean, NullWritable.get());
        }
    }
    GroupMapper
    package com.mr.group;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * reducer
     *
     * @author zcc ON 2018/2/6
     **/
    public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            // 设置了自定义分组策略后,到这里就是ID相同则为一组了,取出第一个key则为结果
            context.write(key, NullWritable.get());
        }
    }
    GroupReducer
    package com.mr.group;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * compator
     *
     * @author zcc ON 2018/2/6
     **/
    public class OrderComparator extends WritableComparator{
        protected OrderComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // 强转为子类
            OrderBean bean1 = (OrderBean) a;
            OrderBean bean2 = (OrderBean) b;
            // 将ID是否相同视为一组
            return bean1.getItemId().compareTo(bean2.getItemId());
        }
    }
    OrderComparator
    package com.mr.group;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * bean
     *
     * @author zcc ON 2018/2/6
     **/
    public class OrderBean implements WritableComparable<OrderBean>{
        private String itemId;
        private Double amount;
    
        public OrderBean() {
        }
    
        public OrderBean(String itemId, Double amount) {
            this.itemId = itemId;
            this.amount = amount;
        }
        public void set(String itemId, Double amount) {
            this.itemId = itemId;
            this.amount = amount;
        }
        public String getItemId() {
            return itemId;
        }
    
        public void setItemId(String itemId) {
            this.itemId = itemId;
        }
    
        public Double getAmount() {
            return amount;
        }
    
        public void setAmount(Double amount) {
            this.amount = amount;
        }
    
        @Override
        public int compareTo(OrderBean o) {
            // 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
            int cmp = this.getItemId().compareTo(o.getItemId());
            if (0 == cmp) {
                return -Double.compare(this.getAmount(), o.getAmount());
            }
            return cmp;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(itemId);
            out.writeDouble(amount);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.itemId = in.readUTF();
            this.amount = in.readDouble();
        }
    
        @Override
        public String toString() {
            return itemId + "	" + amount;
        }
    }
    OrderBean
    package com.mr.group;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * driver
     *
     * @author zcc ON 2018/2/6
     **/
    public class GroupDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 设置本程序jar包本地位置
            job.setJarByClass(GroupDriver.class);
            // 指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(GroupMapper.class);
            job.setReducerClass(GroupReducer.class);
            // 指定map输出的数据类型(由于可插拔的序列化机制导致)
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
            // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            // 关键步骤,设置分组器
            job.setGroupingComparatorClass(OrderComparator.class);
            FileInputFormat.setInputPaths(job, new Path("F:/input"));
            FileOutputFormat.setOutputPath(job, new Path("F:/output"));
            // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
            // job.submit();
            // 反馈集群信息
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
    GroupDriver
  • 相关阅读:
    AngularJS特性
    FOR XML PATH 解决联接返回结果集各记录问题
    ASP.NET MVC与WebForm区别
    C#.net 获取当前应用程序所在路径及环境变量
    .net 4.0 运行时中运行.net2.0开发的程序
    混合模式程序集是针对“v2.0.50727”版的运行时生成的,在没有配置其他信息的情况下,无法在 4.0 运行时中加载该程序集
    sort() 方法用于对数组的元素进行排序
    SQL Server 表和索引存储结构
    SQL Server 数据库文件管理
    navicat 导入数据报错 --- 1153
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8418169.html
Copyright © 2011-2022 走看看