zoukankan      html  css  js  c++  java
  • 统计电视机顶盒中无效用户数据,并以压缩格式输出有效用户数据

      前面我们学习了如何使用MapReduce计数器,那么我们通过下面这个项目巩固我们所学

    1、介绍

      本项目我们使用电视机顶盒数据,统计出无效用户数据记录,并解析出有效的用户数据以压缩格式输出

    2、数据集

      image

      数据来源于“hadoop小文件合并”处理后的结果

    3、分析

      基于需求,我们通过以下几步完成:

      1、首先使用Jsoup,解析出html格式的机顶盒数据

      2、编写Mapper类,自定义计数器统计无效的机顶盒数据,并将有效的机顶盒数据以压缩格式输出

    4、实现

      1、首先定义一个ParseTVData类,解析输入数据集,并以list集合返回

    package com.buaa;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.commons.lang.StringUtils;
    import org.jsoup.Jsoup;
    import org.jsoup.nodes.Document;
    import org.jsoup.nodes.Element;
    import org.jsoup.select.Elements;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /** 
    * @ProjectName CountSetTopBoxUserData
    * @PackageName com.buaa
    * @ClassName ParseTVData
    * @Description 解析数据
    * @Author 刘吉超
    * @Date 2016-05-28 16:15:08
    */
    public class ParseTVData {
        private static Logger logger = LoggerFactory.getLogger(ParseTVData.class);
        
        /**
         * 使用 Jsoup 工具,解析输入数据
         * 
         * @param text
         * @return List
         */
        public static List<String> transData(String text) {
            List<String> list = new ArrayList<String>();
            
            try {
                // jsoup解析数据
                Document doc = Jsoup.parse(text);
                Elements content = doc.getElementsByTag("WIC");
                
                // 机顶盒号
                String stbNum = content.get(0).attr("stbNum");
                if(StringUtils.isEmpty(stbNum)){
                    return list;
                }
                
                // 日期
                String date = content.get(0).attr("date");
                
                Elements els = doc.getElementsByTag("A");
                if (els.isEmpty()) {
                    return list;
                }
                
                for (Element el : els) {
                    // 结束时间
                    String e = el.attr("e");
                    // 开始时间
                    String s = el.attr("s");
                    // 频道名称
                    String sn = el.attr("sn");
                    
                    StringBuilder rec = new StringBuilder().append(stbNum).append("@").append(date).append("@").append(sn).append("@").append(s).append("@").append(e);
                    
                    list.add(rec.toString());
                }
            } catch (Exception e) {
                logger.error("", e);
                return list;
            }
            return list;
        }
    }

      2、编写Mapper类,自定义计数器统计无效的机顶盒数据,并将有效的机顶盒数据以压缩格式输出

    package com.buaa;
    
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /** 
    * @ProjectName CountSetTopBoxUserData
    * @PackageName com.buaa
    * @ClassName CountUserData
    * @Description 统计电视机顶盒中无效用户数据,并以压缩格式输出
    * @Author 刘吉超
    * @Date 2016-05-28 16:11:12
    */
    public class CountUserData extends Configured implements Tool {
        // 定义枚举对象
        public static enum LOG_PROCESSOR_COUNTER {
            BAD_RECORDS
        };
        
        /**
         * 解析数据,统计无效数据,并输出有效数据
         */
        public static class CounterAndCompressionMapper extends Mapper<LongWritable, Text, Text, Text> {
            protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
                // 解析每条机顶盒记录,返回list集合
                List<String> list = ParseTVData.transData(value.toString());
                
                // 无效记录
                if (list.isEmpty()) {
                    // 动态自定义计数器
                    context.getCounter("ErrorRecordCounter", "ERROR_Record_TVData").increment(1);
                    // 枚举声明计数器
                    context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS).increment(1);
                } else {
                    for (String validateRecord : list) {
                        //输出解析数据
                        context.write(new Text(validateRecord), new Text(""));
                    }
                }
    
            }
        }
        
        @SuppressWarnings("deprecation")
        @Override
        public int run(String[] args) throws Exception {
            // 读取配置文件
            Configuration conf = new Configuration();
            
            // 如果输出目录存在,则删除 
            Path mypath = new Path(args[1]);
            FileSystem hdfs = mypath.getFileSystem(conf);
            if (hdfs.isDirectory(mypath)) {
                // 删除已经存在的文件路径
                hdfs.delete(mypath, true);
            }
            
            // 新建一个任务
            Job job = new Job(conf, "CountUserData");
            // 设置主类
            job.setJarByClass(CountUserData.class);
            
            // Mapper
            job.setMapperClass(CounterAndCompressionMapper.class);
            
            // 输出key类型
            job.setOutputKeyClass(Text.class);
            // 输出value类型
            job.setOutputValueClass(Text.class);
            
            // 输入路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            // 输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            // 对输出结果设置压缩
            FileOutputFormat.setCompressOutput(job, true);
            // 设置压缩类型
            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            
            return job.waitForCompletion(true) ? 0 : 1;
        }
        
        public static void main(String[] args) throws Exception {        
            String[] date = {"20120917","20120918","20120919","20120920","20120921","20120922","20120923"};
            
            int result = 1;
            for(String dt : date){
                String[] args0 = { 
                        "hdfs://hadoop1:9000/buaa/tv/" + dt + ".txt",
                        "hdfs://hadoop1:9000/buaa/tv/out/"+dt
                    };
                result = ToolRunner.run(new Configuration(), new CountUserData(), args0);
            }
            
            System.exit(result);
        }
    }

    5、运行结果

      1、查看计数器统计的无效数据

      image

      2、查看输出目录下的压缩文件

      image

    如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
    如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
    如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

    实现代码及数据:下载

  • 相关阅读:
    mysql_config 问题
    软考倒计时3天
    软考倒计时5天
    Pdf 解密后复制文字乱码
    软考倒计时7天:题目书中的易混点
    应急储备和管理储备
    软考倒计时9天:100个主要知识点
    软考倒计时10天
    软考倒计时15天
    软考倒计时18天
  • 原文地址:https://www.cnblogs.com/codeOfLife/p/5538956.html
Copyright © 2011-2022 走看看