zoukankan      html  css  js  c++  java
  • Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)

    1.需求:

      现有一些原始日志需要做增强解析处理,流程:

      1、 从原始日志文件中读取数据(日志文件:https://pan.baidu.com/s/12hbDvP7jMu9yE-oLZXvM_g)

      2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

      3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

    2.需求分析:

      程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可

    以通过自定义outputformat来实现

    3.需求实现: 

    技术实现要点:

      1、 在mapreduce中访问外部资源(知识数据库)

      2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()


    代码实现:

    1.数据库获取数据的工具类:

      1.首先启动数据库服务(192.168.232.201):service mysql start

      2.使用远程客户端连接数据库工具Navicat操作数据库:导入创建url_rule表语句和导入该表数据

      3.创建表语句及其数据下载:https://pan.baidu.com/s/1k74-o8wbFp5QC8Ee4Fu1YQ

      

    package cn.bigdata.hdfs.LogEnhance;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.Map;
    
    public class DBLoader {
        public static void dbLoader(Map<String, String> ruleMap) throws Exception {
    
            Connection conn = null;
            Statement st = null;
            ResultSet res = null;
            
            try {
                Class.forName("com.mysql.jdbc.Driver");
                conn = DriverManager.getConnection("jdbc:mysql://192.168.232.201:3306/mysql", "root", "root");
                st = conn.createStatement();
                res = st.executeQuery("select url,content from url_rule");
                while (res.next()) {
                    ruleMap.put(res.getString(1), res.getString(2));
                }
            } finally {
                try{
                    if(res!=null){
                        res.close();
                    }
                    if(st!=null){
                        st.close();
                    }
                    if(conn!=null){
                        conn.close();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
    View Code

     2.自定义一个outputformat继承自FileOutputFormat,实现getRecordWriter方法:

    package cn.bigdata.hdfs.LogEnhance;
    import java.io.IOException;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    /**
     * maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter
     * 然后再调用RecordWriter的write(k,v)方法将数据写出
     */
    public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {
    
        @Override
        public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
            ///拿到一个文件系统操作的客户端实例对象
            FileSystem fs = FileSystem.get(context.getConfiguration());
    
            Path enhancePath = new Path("F:/temp/en/log.dat");
            Path tocrawlPath = new Path("F:/temp/crw/url.dat");
            //流式创建文件
            FSDataOutputStream enhancedOs = fs.create(enhancePath);
            FSDataOutputStream tocrawlOs = fs.create(tocrawlPath);
    
            return new EnhanceRecordWriter(enhancedOs, tocrawlOs);
        }
    
        /**
         * 构造一个自己的recordwriter
         */
        static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> {
            
            FSDataOutputStream enhancedOs = null;
            FSDataOutputStream tocrawlOs = null;
    
            public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {
                super();
                this.enhancedOs = enhancedOs;
                this.tocrawlOs = tocrawlOs;
            }
            //实现抽象方法write
            @Override
            public void write(Text key, NullWritable value) throws IOException, InterruptedException {
                String result = key.toString();
                // 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat
                if (result.contains("tocrawl")) {
                    tocrawlOs.write(result.getBytes());
                } else {
                    // 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat
                    enhancedOs.write(result.getBytes());
                }
            }
            //实现抽象方法close
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                if (tocrawlOs != null) {
                    tocrawlOs.close();
                }
                if (enhancedOs != null) {
                    enhancedOs.close();
                }
            }
        }
    }

     3.开发mapreduce处理流程:

      这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行

    原始日志后面

      maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中

    package cn.bigdata.hdfs.LogEnhance;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogEnhance {
    
        static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            Map<String, String> ruleMap = new HashMap<String, String>();
            Text k = new Text();
            NullWritable v = NullWritable.get();
    
            // 从数据库中加载规则信息倒ruleMap中
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
                try {
                    DBLoader.dbLoader(ruleMap);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                // 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
                Counter counter = context.getCounter("malformed", "malformedline");
                String line = value.toString();
                String[] fields = StringUtils.split(line, "	");
                try {
                    String url = fields[26];
                    String content_tag = ruleMap.get(url);
                    // 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志
                    if (content_tag == null) {
                        k.set(url + "	" + "tocrawl" + "
    ");
                        context.write(k, v);
                    } else {
                        k.set(line + "	" + content_tag + "
    ");
                        context.write(k, v);
                    }
                } catch (Exception exception) {
                    counter.increment(1);
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(LogEnhance.class);
            job.setMapperClass(LogEnhanceMapper.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法
            job.setOutputFormatClass(LogEnhanceOutputFormat.class);
            //本地原始日志文件存放目录
            FileInputFormat.setInputPaths(job, new Path("F:/webloginput/"));
            // 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat
            // 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path
            FileOutputFormat.setOutputPath(job, new Path("F:/weblogoutput/"));
    
            // 不需要reducer
            job.setNumReduceTasks(0);
    
            job.waitForCompletion(true);
            System.exit(0);
        }
    }
    View Code

     总结:

      1.其中在mapreduce程序中用到了计数器;获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称

      2.拷贝mysql的驱动包到工程的lib目录下,这里使用本地运行模式;https://pan.baidu.com/s/1ldzQ0i5qdvvJ3Yw08LvF5Q

      3.maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter,然后再调用

    RecordWriter的write(k,v)方法将数据写出

      4.在setup方法中完成知识库的加载,写入到Map中

      5.Map端在Context,write时,如果后面没有Reduce,将没有整个的shuffe过程,将直接调用outPutFormat进行输出,进来什

    么顺序,则出去什么顺序(总之:没有reduce就没有shuffle)

      

     

     

     

     

  • 相关阅读:
    485串口接线
    mvc3 升级mvc5
    VB连接ACCESS数据库,使用 LIKE 通配符问题
    VB6 读写西门子PLC
    可用的 .net core 支持 RSA 私钥加密工具类
    解决 Win7 远程桌面 已停止工作的问题
    解决 WinForm 重写 CreateParams 隐藏窗口以后的显示问题
    解决安装 .net framework 发生 extracting files error 问题
    CentOS7 安装配置笔记
    通过特殊处理 Resize 事件解决 WinForm 加载时闪烁问题的一个方法
  • 原文地址:https://www.cnblogs.com/yaboya/p/9258070.html
Copyright © 2011-2022 走看看