zoukankan      html  css  js  c++  java
  • Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)

    1.需求:

      现有一些原始日志需要做增强解析处理,流程:

      1、 从原始日志文件中读取数据(日志文件:https://pan.baidu.com/s/12hbDvP7jMu9yE-oLZXvM_g)

      2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

      3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

    2.需求分析:

      程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可

    以通过自定义outputformat来实现

    3.需求实现: 

    技术实现要点:

      1、 在mapreduce中访问外部资源(知识数据库)

      2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()


    代码实现:

    1.数据库获取数据的工具类:

      1.首先启动数据库服务(192.168.232.201):service mysql start

      2.使用远程客户端连接数据库工具Navicat操作数据库:导入创建url_rule表语句和导入该表数据

      3.创建表语句及其数据下载:https://pan.baidu.com/s/1k74-o8wbFp5QC8Ee4Fu1YQ

      

    package cn.bigdata.hdfs.LogEnhance;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.Map;
    
    public class DBLoader {
        public static void dbLoader(Map<String, String> ruleMap) throws Exception {
    
            Connection conn = null;
            Statement st = null;
            ResultSet res = null;
            
            try {
                Class.forName("com.mysql.jdbc.Driver");
                conn = DriverManager.getConnection("jdbc:mysql://192.168.232.201:3306/mysql", "root", "root");
                st = conn.createStatement();
                res = st.executeQuery("select url,content from url_rule");
                while (res.next()) {
                    ruleMap.put(res.getString(1), res.getString(2));
                }
            } finally {
                try{
                    if(res!=null){
                        res.close();
                    }
                    if(st!=null){
                        st.close();
                    }
                    if(conn!=null){
                        conn.close();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
    View Code

     2.自定义一个outputformat继承自FileOutputFormat,实现getRecordWriter方法:

    package cn.bigdata.hdfs.LogEnhance;
    import java.io.IOException;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    /**
     * maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter
     * 然后再调用RecordWriter的write(k,v)方法将数据写出
     */
    public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {
    
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
            ///拿到一个文件系统操作的客户端实例对象
            FileSystem fs = FileSystem.get(context.getConfiguration());
    
            Path enhancePath = new Path("F:/temp/en/log.dat");
            Path tocrawlPath = new Path("F:/temp/crw/url.dat");
            //流式创建文件
            FSDataOutputStream enhancedOs = fs.create(enhancePath);
            FSDataOutputStream tocrawlOs = fs.create(tocrawlPath);
    
            return new EnhanceRecordWriter(enhancedOs, tocrawlOs);
        }
    
        /**
         * 构造一个自己的recordwriter
         */
        static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> {
            
            FSDataOutputStream enhancedOs = null;
            FSDataOutputStream tocrawlOs = null;
    
            public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {
                super();
                this.enhancedOs = enhancedOs;
                this.tocrawlOs = tocrawlOs;
            }
            //实现抽象方法write
            @Override
            public void write(Text key, NullWritable value) throws IOException, InterruptedException {
                String result = key.toString();
                // 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat
                if (result.contains("tocrawl")) {
                    tocrawlOs.write(result.getBytes());
                } else {
                    // 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat
                    enhancedOs.write(result.getBytes());
                }
            }
            //实现抽象方法close
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                if (tocrawlOs != null) {
                    tocrawlOs.close();
                }
                if (enhancedOs != null) {
                    enhancedOs.close();
                }
            }
        }
    }

     3.开发mapreduce处理流程:

      这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行

    原始日志后面

      maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中

    package cn.bigdata.hdfs.LogEnhance;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogEnhance {
    
        static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            Map<String, String> ruleMap = new HashMap<String, String>();
            Text k = new Text();
            NullWritable v = NullWritable.get();
    
            // 从数据库中加载规则信息倒ruleMap中
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
                try {
                    DBLoader.dbLoader(ruleMap);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                // 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
                Counter counter = context.getCounter("malformed", "malformedline");
                String line = value.toString();
                String[] fields = StringUtils.split(line, "	");
                try {
                    String url = fields[26];
                    String content_tag = ruleMap.get(url);
                    // 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志
                    if (content_tag == null) {
                        k.set(url + "	" + "tocrawl" + "
    ");
                        context.write(k, v);
                    } else {
                        k.set(line + "	" + content_tag + "
    ");
                        context.write(k, v);
                    }
                } catch (Exception exception) {
                    counter.increment(1);
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(LogEnhance.class);
            job.setMapperClass(LogEnhanceMapper.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法
            job.setOutputFormatClass(LogEnhanceOutputFormat.class);
            //本地原始日志文件存放目录
            FileInputFormat.setInputPaths(job, new Path("F:/webloginput/"));
            // 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat
            // 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path
            FileOutputFormat.setOutputPath(job, new Path("F:/weblogoutput/"));
    
            // 不需要reducer
            job.setNumReduceTasks(0);
    
            job.waitForCompletion(true);
            System.exit(0);
        }
    }
    View Code

     总结:

      1.其中在mapreduce程序中用到了计数器;获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称

      2.拷贝mysql的驱动包到工程的lib目录下,这里使用本地运行模式;https://pan.baidu.com/s/1ldzQ0i5qdvvJ3Yw08LvF5Q

      3.maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter,然后再调用

    RecordWriter的write(k,v)方法将数据写出

      4.在setup方法中完成知识库的加载,写入到Map中

      5.Map端在Context,write时,如果后面没有Reduce,将没有整个的shuffe过程,将直接调用outPutFormat进行输出,进来什

    么顺序,则出去什么顺序(总之:没有reduce就没有shuffle)

      

     

     

     

     

  • 相关阅读:
    idea_pyspark 环境配置
    Win7 单机Spark和PySpark安装
    Spark在Windows下的环境搭建
    linux 登陆key生成
    nginx 根据参数选择文档根目录
    系统操作日志设计(转)
    smarty、smarty格式化、smarty整数、smarty float、smarty各种转换方式、smarty日期转换等等 (转)
    Mac下面的SecureCRT(附破解方案) 更新到最新的7.3.2(转)
    nginx php-fpm 输出php错误日志
    解决PHP显示Warning和Notice等问题
  • 原文地址:https://www.cnblogs.com/yaboya/p/9258070.html
Copyright © 2011-2022 走看看