zoukankan      html  css  js  c++  java
  • 【电商日志项目之四】数据清洗-ETL

    环境
      hadoop-2.6.5

      首先要知道为什么要做数据清洗?通过各个渠道收集到的数据并不能直接用于下一步的分析,所以需要对这些数据进行缺失值清洗、格式内容清洗、逻辑错误清洗、非需求数据清洗、关联性验证等处理操作,转换成可用的数据。
    具体要做的工作可以参考文章:数据清洗的一些梳理

    当了解ETL之后,有一些工具,比如开源kettle可以做这个工作。但是也可以完全自己开发,ETL无非就是三个阶段:数据抽取、数据清洗、清洗后数据存储。比如可借助hadoop、spark、kafka都可以做这个工作,清洗的规则可以按需开发。
    这里借助hadoop编写MR来完成ETL工作。

    根据架构图设计,ETL之后的数据要存到HBase,所以ETL阶段整个工作分四块:
    1、过滤脏数据
    2、解析IP-IPSeeker
    3、浏览器信息解析-UASparser
    4、设计rowkey

    Runner:

    package com.sxt.etl.mr.ald;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.GlobalConstants;
    import com.sxt.util.TimeUtil;
    
    /**
     * 编写mapreduce的runner类
     *
     */
    public class AnalyserLogDataRunner implements Tool 
    {
        private static final Logger logger = Logger.getLogger(AnalyserLogDataRunner.class);
        private Configuration conf = null;
    
        public static void main(String[] args) 
        {
            try 
            {
                ToolRunner.run(new Configuration(), new AnalyserLogDataRunner(), args);
            } 
            catch (Exception e) 
            {
                logger.error("执行日志解析job异常", e);
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void setConf(Configuration conf) {
            conf.set("fs.defaultFS", "hdfs://node1:8020");
             //conf.set("yarn.resourcemanager.hostname", "node3");
            //conf.set("hbase.zookeeper.quorum", "node1,node2,node3");//用来连接HBase
            conf.set("hbase.zookeeper.quorum", "node104");//用来连接HBase
            this.conf = HBaseConfiguration.create(conf);
        }
    
        @Override
        public Configuration getConf() {
            return this.conf;
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = this.getConf();
            this.processArgs(conf, args);
    
            Job job = Job.getInstance(conf, "analyser_logdata");
    
            // 设置本地提交job,集群运行,需要代码
            // File jarFile = EJob.createTempJar("target/classes");
            // ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
            // 设置本地提交job,集群运行,需要代码结束
    
            job.setJarByClass(AnalyserLogDataRunner.class);
            job.setMapperClass(AnalyserLogDataMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Put.class);
            // 设置reducer配置
            // 1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true)
            // TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,null, job);
            // 2. 本地运行,要求参数addDependencyJars为false
            TableMapReduceUtil.initTableReducerJob(
                    EventLogConstants.HBASE_NAME_EVENT_LOGS, 
                    null, 
                    job, 
                    null, 
                    null,
                    null, 
                    null, 
                    false);
            job.setNumReduceTasks(0);
    
            // 设置输入路径
            this.setJobInputPaths(job);
            return job.waitForCompletion(true) ? 0 : -1;
        }
    
        /**
         * 处理参数
         * 
         * @param conf
         * @param args
         */
        private void processArgs(Configuration conf, String[] args) {
            String date = null;
            for (int i = 0; i < args.length; i++) {
                if ("-d".equals(args[i])) {
                    if (i + 1 < args.length) {
                        date = args[++i];
                        break;
                    }
                }
            }
            
            System.out.println("-----" + date);
    
            // 要求date格式为: yyyy-MM-dd
            if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
                // date是一个无效时间数据
                date = TimeUtil.getYesterday(); // 默认时间是昨天
                System.out.println(date);
            }
            conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
        }
    
        /**
         * 设置job的输入路径
         * 
         * @param job
         */
        private void setJobInputPaths(Job job) 
        {
            Configuration conf = job.getConfiguration();
            FileSystem fs = null;
            try 
            {
                fs = FileSystem.get(conf);
                String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
                // Path inputPath = new Path("/flume/" +
                // TimeUtil.parseLong2String(TimeUtil.parseString2Long(date),
                // "MM/dd/"));
                Path inputPath = new Path("/log/"
                        + TimeUtil.parseLong2String(
                                TimeUtil.parseString2Long(date), "yyyyMMdd")
                        + "/");
                
                if (fs.exists(inputPath)) 
                {
                    FileInputFormat.addInputPath(job, inputPath);
                }
                else 
                {
                    throw new RuntimeException("文件不存在:" + inputPath);
                }
            } 
            catch (IOException e)
            {
                throw new RuntimeException("设置job的mapreduce输入路径出现异常", e);
            } 
            finally 
            {
                if (fs != null) 
                {
                    try 
                    {
                        fs.close();
                    } 
                    catch (IOException e) 
                    {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    }

    Mapper:

    package com.sxt.etl.mr.ald;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.zip.CRC32;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.log4j.Logger;
    
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.EventLogConstants.EventEnum;
    import com.sxt.etl.util.LoggerUtil;
    
    /**
     * 自定义数据解析map类
     * 
     * @author root
     *
     */
    public class AnalyserLogDataMapper extends Mapper<LongWritable, Text, NullWritable, Put> 
    {
        private final Logger logger = Logger.getLogger(AnalyserLogDataMapper.class);
        private int inputRecords, filterRecords, outputRecords; // 主要用于标志,方便查看过滤数据
        private byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
        private CRC32 crc32 = new CRC32();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            this.inputRecords++;
            this.logger.debug("Analyse data of :" + value);
    
            try 
            {
                // 解析日志
                Map<String, String> clientInfo = LoggerUtil.handleLog(value.toString());
    
                // 过滤解析失败的数据
                if (clientInfo.isEmpty()) {
                    this.filterRecords++;
                    return;
                }
    
                // 获取事件名称
                String eventAliasName = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME);
                EventEnum event = EventEnum.valueOfAlias(eventAliasName);
                switch (event) {
                case LAUNCH:
                case PAGEVIEW:
                case CHARGEREQUEST:
                case CHARGEREFUND:
                case CHARGESUCCESS:
                case EVENT:
                    // 处理数据
                    this.handleData(clientInfo, event, context);
                    break;
                default:
                    this.filterRecords++;
                    this.logger.warn("该事件没法进行解析,事件名称为:" + eventAliasName);
                }
            } 
            catch (Exception e) 
            {
                this.filterRecords++;
                this.logger.error("处理数据发出异常,数据:" + value, e);
            }
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            super.cleanup(context);
            logger.info("输入数据:" + this.inputRecords + ";输出数据:" + this.outputRecords + ";过滤数据:" + this.filterRecords);
        }
    
        /**
         * 具体处理数据的方法
         * 
         * @param clientInfo
         * @param context
         * @param event
         * @throws InterruptedException
         * @throws IOException
         */
        private void handleData(Map<String, String> clientInfo, EventEnum event,Context context) throws IOException, InterruptedException 
        {
            String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
            String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
            String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
            if (StringUtils.isNotBlank(serverTime))
            {
                // 要求服务器时间不为空
                clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉
                String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
                Put put = new Put(Bytes.toBytes(rowkey));
                for (Map.Entry<String, String> entry : clientInfo.entrySet()) 
                {
                    if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) 
                    {
                        put.add(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
                    }
                }
                context.write(NullWritable.get(), put);
                this.outputRecords++;
            } 
            else 
            {
                this.filterRecords++;
            }
        }
    
        /**
         * 根据uuid memberid servertime创建rowkey
         * 
         * @param uuid
         * @param memberId
         * @param eventAliasName
         * @param serverTime
         * @return
         */
        private String generateRowKey(String uuid, String memberId, String eventAliasName, String serverTime) 
        {
            StringBuilder sb = new StringBuilder();
            sb.append(serverTime).append("_");
            this.crc32.reset();
            if (StringUtils.isNotBlank(uuid)) {
                this.crc32.update(uuid.getBytes());
            }
            if (StringUtils.isNotBlank(memberId)) {
                this.crc32.update(memberId.getBytes());
            }
            this.crc32.update(eventAliasName.getBytes());
            sb.append(this.crc32.getValue() % 100000000L);
            return sb.toString();
        }
    }

     处理日志数据的具体工作类:

    package com.sxt.etl.util;
    
    import java.net.URLDecoder;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.log4j.Logger;
    
    import com.sxt.common.EventLogConstants;
    import com.sxt.etl.util.IPSeekerExt.RegionInfo;
    import com.sxt.etl.util.UserAgentUtil.UserAgentInfo;
    import com.sxt.util.TimeUtil;
    
    /**
     * 处理日志数据的具体工作类
     * 
     * @author root
     *
     */
    public class LoggerUtil {
        private static final Logger logger = Logger.getLogger(LoggerUtil.class);
        private static IPSeekerExt ipSeekerExt = new IPSeekerExt();
    
        /**
         * 处理日志数据logText,返回处理结果map集合<br/>
         * 如果logText没有指定数据格式,那么直接返回empty的集合
         * 
         * @param logText
         * @return
         */
        public static Map<String, String> handleLog(String logText) 
        {
            Map<String, String> clientInfo = new HashMap<String, String>();
            if (StringUtils.isNotBlank(logText)) 
            {
                //192.168.118.1^A1561656575.201^Anode101^A/log.gif?en=e_l&ver=1&pl=website&sdk=js&u_ud=E5631595-EDC2-4B3B-A306-B19576D74DC3&u_sd=C7C0D4E3-7E60-479B-AC1C-2F5305EC20D4&c_time=1561627763553&l=zh-CN&b_iev=Mozilla%2F5.0%20(Windows%20NT%206.1%3B%20Win64%3B%20x64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F75.0.3770.100%20Safari%2F537.36&b_rst=1920*1080
                String[] splits = logText.trim().split(EventLogConstants.LOG_SEPARTIOR);
                if (splits.length == 4) 
                {
                    // 日志格式为: ip^A服务器时间^Ahost^A请求参数
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_IP, splits[0].trim()); // 设置ip
                    // 设置服务器时间
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(TimeUtil.parseNginxServerTime2Long(splits[1].trim())));
                    int index = splits[3].indexOf("?");
                    if (index > -1) {
                        String requestBody = splits[3].substring(index + 1); // 获取请求参数,也就是我们的收集数据
                        // 处理请求参数
                        handleRequestBody(requestBody, clientInfo);
                        // 处理userAgent
                        handleUserAgent(clientInfo);
                        // 处理ip地址
                        handleIp(clientInfo);
                    } else {
                        // 数据格式异常
                        clientInfo.clear();
                    }
                }
            }
            return clientInfo;
        }
    
        /**
         * 处理ip地址
         * 
         * @param clientInfo
         */
        private static void handleIp(Map<String,String> clientInfo) {
            if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_IP)) {
                String ip = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_IP);
                RegionInfo info = ipSeekerExt.analyticIp(ip);
                if (info != null) {
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity());
                }
            }
        }
    
        /**
         * 处理浏览器的userAgent信息
         * 
         * @param clientInfo
         */
        private static void handleUserAgent(Map<String, String> clientInfo) {
            if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)) {
                UserAgentInfo info = UserAgentUtil.analyticUserAgent(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT));
                if (info != null) {
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, info.getOsName());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, info.getOsVersion());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, info.getBrowserName());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, info.getBrowserVersion());
                }
            }
        }
    
        /**
         * 处理请求参数
         * 
         * @param requestBody
         * @param clientInfo
         */
        private static void handleRequestBody(String requestBody, Map<String, String> clientInfo) 
        {
            if (StringUtils.isNotBlank(requestBody)) 
            {
                String[] requestParams = requestBody.split("&");
                for (String param : requestParams) 
                {
                    if (StringUtils.isNotBlank(param)) 
                    {
                        int index = param.indexOf("=");
                        if (index < 0) 
                        {
                            logger.warn("没法进行解析参数:" + param + ", 请求参数为:" + requestBody);
                            continue;
                        }
    
                        String key = null, value = null;
                        try 
                        {
                            key = param.substring(0, index);
                            value = URLDecoder.decode(param.substring(index + 1), "utf-8");
                        } 
                        catch (Exception e) 
                        {
                            logger.warn("解码操作出现异常", e);
                            continue;
                        }
                        if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value))
                        {
                            clientInfo.put(key, value);
                        }
                    }
                }
            }
        }
    }

     HBase创建接收清洗后的数据表:

    hbase(main):002:0> create 'eventlog','log'

    代码参考:wjy.rar


    参考:
    开源ETL工具kettle

  • 相关阅读:
    (Lineup the Dominoes筛子)三维状压
    Halloween Costumes 玄学题
    jQuery之动画
    javascript之位置
    javascript之事件
    jQuery之DOM
    jQuery之选择器
    jQuery简介
    javascript之Bom简介
    javascript之DOM操作
  • 原文地址:https://www.cnblogs.com/cac2020/p/11124389.html
Copyright © 2011-2022 走看看