zoukankan      html  css  js  c++  java
  • 大数据离线分析平台 用户数据Etl

    Etl目标 

    解析我们收集的日志数据,将解析后的数据保存到hbase中。这里选择hbase来存储数据的主要原因就是:

    hbase的宽表结构设计适合我们的这样多种数据格式的数据存储(不同event有不同的存储格式)。

    在etl过程中,我们需要将我们收集得到的数据进行处理,包括ip地址解析、userAgent解析、服务器时间解析等。

    在我们本次项目中ip解析采用的是纯真ip数据库,官网是http://www.cz88.net/
    另外:ip解析可以采用淘宝提供的ip接口来进行解析
    地址:http://ip.taobao.com/
    接口:http://ip.taobao.com/service/getIpInfo.php?ip=[ip地址字串]

    ETL存储
    etl的结果存储到hbase中,由于考虑到不同事件有不同的数据格式,所以我们将最终etl的结果保存到hbase中,我们使用单family的数据格式,rowkey的生产模式我们采用timestamp+uuid.crc编码的方式。hbase创建命令:create 'event_logs', 'info'

    步骤如下:

    1. 修改pom文件,添加hadoop和hbase依赖

    <dependencies>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
                <version>2.5</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>cz.mallat.uasparser</groupId>
                <artifactId>uasparser</artifactId>
                <version>0.6.2</version>
            </dependency>
            <!-- hadoop start -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.7.2</version>
            </dependency>
            <!-- hadoop end -->
            <!-- hbase start -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.1.3</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client </artifactId>
                <version>1.1.3</version>
            </dependency>
    
    
            <!-- hbase end -->
            <!-- mysql start -->
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.18</version>
        </dependency>
        <!-- mysql end -->    
            <dependency>
                <groupId>cz.mallat.uasparser</groupId>
                <artifactId>uasparser</artifactId>
                <version>0.6.1</version>
            </dependency>
    
            <dependency>
                <groupId>jdk.tools</groupId>
                <artifactId>jdk.tools</artifactId>
                <version>1.8</version>
                <scope>system</scope>
                <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
            </dependency>
            <dependency>
                <groupId>org.cloudera.htrace</groupId>
                <artifactId>htrace-core</artifactId>
                <version>2.04</version>
            </dependency>
            <dependency>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
                <version>3.1.0-incubating</version>
            </dependency>
            <dependency>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core4</artifactId>
                <version>4.0.1-incubating</version>
            </dependency>
        </dependencies>


    2. 添加LoggerUtil类,中间设计到EventLogConstant常量类和TimeUtil工具类
    LoggerUtil主要作用就是解析日志,返回一个map对象

    package com.kk.etl.util;
    
    import java.io.UnsupportedEncodingException;
    import java.net.URLDecoder;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.log4j.Logger;
    
    import com.kk.common.EventLogConstants;
    import com.kk.etl.util.IpSeekerExt.RegionInfo;
    import com.kk.etl.util.UserAgentUtil.UserAgentInfo;
    import com.kk.util.TimeUtil;
    public class LoggerUtils {
        private static final Logger logger = Logger.getLogger(LoggerUtils.class);
        private static IpSeekerExt ipSeekerExt = new IpSeekerExt();
    
        /**
         * 处理日志数据logText,返回处理结果map集合<br/>
         * 如果logText没有指定数据格式,那么直接返回empty的集合
         * 
         * @param logText
         * @return
         */
        public static Map<String, String> handleLog(String logText) {
            Map< String, String> clientInfo=new HashMap<String,String>();
            if (logText!=null&&!logText.isEmpty()) {
                String[] splits=logText.split(EventLogConstants.LOG_SEPARTIOR);
                if (splits.length==4) {
                    // 日志格式为: ip^A   服务器时间^A    host^A    请求参数
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_IP, splits[0].trim()); // 设置ip
                    //设置服务器时间
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(TimeUtil.parseNginxServerTime2Long(splits[1].trim())));
                    int index=splits[3].indexOf("?");
                    if (index > -1) {
                        String requestBody = splits[3].substring(index + 1); // 获取请求参数,也就是我们的收集数据
                        // 处理请求参数
                        handleRequestBody(requestBody, clientInfo);
                        // 处理userAgent
                        handleUserAgent(clientInfo);
                        // 处理ip地址
                        handleIp(clientInfo);
                }else {
                     // 数据格式异常
                    clientInfo.clear();
                }
        
            } 
            
            
        }
            return clientInfo;
            }
            /**
             * 处理请求参数
             * 
             * @param requestBody
             * @param clientInfo
             */
            private static void handleRequestBody(String requestBody, Map<String, String> clientInfo) {
                 
                String[] requestParams=requestBody.split("&");
               for (String param : requestParams) {
                   if (param!=null&&!param.isEmpty()) {
                       int index=  param.indexOf("=");
                       if (index < 0) {
                           logger.warn("没法进行解析参数:" + param + ", 请求参数为:" + requestBody);
                           continue;
                       }
                      
                    try {
                         String p1=param.substring(0,index);
                          String p2 = URLDecoder.decode(param.substring(index + 1), "utf-8");
                          if (StringUtils.isNotBlank(p1) && StringUtils.isNotBlank(p2)) {
                                clientInfo.put(p1, p2);
                            }
                    } catch (UnsupportedEncodingException e) {
                         logger.warn("解码操作出现异常", e);
                         continue;
                    }
                      
                } else {
    
                }
                
            }           
         }
    
        /**
         * 处理ip地址
         * 
         * @param clientInfo
         */
        private static void handleIp(Map<String,String> clientInfo) {
             if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_IP)) {
                 String ip = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_IP);
                 RegionInfo info = ipSeekerExt.analyticIp(ip);
                 if (info!= null) {
                     if (info.getCountry().equals("unknown")&&info.getCity().equals("unknown")&&info.getProvince().equals("unknown")) {
                        info.setCountry("中国");
                        info.setProvince("广东省");
                        info.setCity("广州市");
                    }
                     clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry());
                     clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince());
                     clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity());
                 }
             }
        }
    
        /**
         * 处理浏览器的userAgent信息
         * 
         * @param clientInfo
         */
        private static void handleUserAgent(Map<String, String> clientInfo) {
            if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)) {
                UserAgentInfo info = UserAgentUtil.analyticUserAgent(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT));
                if (info != null) {
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, info.getOsName());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, info.getOsVersion());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, info.getBrowserName());
                    clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, info.getBrowserVersion());
                }
            }
        }
    
        
        
    
    }


    EventLogConstants主要作用就是描述hbase的event_logs表的信息(表名,列簇名,列名)以及日志收集的日志中的数据参数name。其实列名和name是一样的。

    package com.kk.common;
    
    import java.nio.ByteBuffer;
    
    /**
     * 
     * @author hzk
     *@parm
     *
     */
    public class EventLogConstants {
        
        /**
         * 事件枚举类。指定事件的名称
         * 
         * @author gerry
         *
         */
        public static enum EventEnum {
            LAUNCH(1, "launch event", "e_l"), // launch事件,表示第一次访问
            PAGEVIEW(2, "page view event", "e_pv"), // 页面浏览事件
            CHARGEREQUEST(3, "charge request event", "e_crt"), // 订单生产事件
            CHARGESUCCESS(4, "charge success event", "e_cs"), // 订单成功支付事件
            CHARGEREFUND(5, "charge refund event", "e_cr"), // 订单退款事件
            EVENT(6, "event duration event", "e_e") // 事件
            ;
    
            public final int id; // id 唯一标识
            public final String name; // 名称
            public final String alias; // 别名,用于数据收集的简写
    
            private EventEnum(int id, String name, String alias) {
                this.id = id;
                this.name = name;
                this.alias = alias;
            }
    
            /**
             * 获取匹配别名的event枚举对象,如果最终还是没有匹配的值,那么直接返回null。
             * 
             * @param alias
             * @return
             */
            public static EventEnum valueOfAlias(String alias) {
                for (EventEnum event : values()) {
                    if (event.alias.equals(alias)) {
                        return event;
                    }
                }
                return null;
            }
        }
    
        /**
         * 表名称
         */
        public static final String HBASE_NAME_EVENT_LOGS = "event_logs";
    
        /**
         * event_logs表的列簇名称
         */
        public static final String EVENT_LOGS_FAMILY_NAME = "info";
    
        /**
         * 日志分隔符
         */
        public static final String LOG_SEPARTIOR = "\^A";
    
        /**
         * 用户ip地址
         */
        public static final String LOG_COLUMN_NAME_IP = "ip";
    
        /**
         * 服务器时间
         */
        public static final String LOG_COLUMN_NAME_SERVER_TIME = "s_time";
    
        /**
         * 事件名称
         */
        public static final String LOG_COLUMN_NAME_EVENT_NAME = "en";
    
        /**
         * 数据收集端的版本信息
         */
        public static final String LOG_COLUMN_NAME_VERSION = "ver";
    
        /**
         * 用户唯一标识符
         */
        public static final String LOG_COLUMN_NAME_UUID = "u_ud";
    
        /**
         * 会员唯一标识符
         */
        public static final String LOG_COLUMN_NAME_MEMBER_ID = "u_mid";
    
        /**
         * 会话id
         */
        public static final String LOG_COLUMN_NAME_SESSION_ID = "u_sd";
        /**
         * 客户端时间
         */
        public static final String LOG_COLUMN_NAME_CLIENT_TIME = "c_time";
        /**
         * 语言
         */
        public static final String LOG_COLUMN_NAME_LANGUAGE = "l";
        /**
         * 浏览器user agent参数
         */
        public static final String LOG_COLUMN_NAME_USER_AGENT = "b_iev";
        /**
         * 浏览器分辨率大小
         */
        public static final String LOG_COLUMN_NAME_RESOLUTION = "b_rst";
        /**
         * 当前url
         */
        public static final String LOG_COLUMN_NAME_CURRENT_URL = "p_url";
        /**
         * 前一个页面的url
         */
        public static final String LOG_COLUMN_NAME_REFERRER_URL = "p_ref";
        /**
         * 当前页面的title
         */
        public static final String LOG_COLUMN_NAME_TITLE = "tt";
        /**
         * 订单id
         */
        public static final String LOG_COLUMN_NAME_ORDER_ID = "oid";
        /**
         * 订单名称
         */
        public static final String LOG_COLUMN_NAME_ORDER_NAME = "on";
        /**
         * 订单金额
         */
        public static final String LOG_COLUMN_NAME_ORDER_CURRENCY_AMOUNT = "cua";
        /**
         * 订单货币类型
         */
        public static final String LOG_COLUMN_NAME_ORDER_CURRENCY_TYPE = "cut";
        /**
         * 订单支付金额
         */
        public static final String LOG_COLUMN_NAME_ORDER_PAYMENT_TYPE = "pt";
        /**
         * category名称
         */
        public static final String LOG_COLUMN_NAME_EVENT_CATEGORY = "ca";
        /**
         * action名称
         */
        public static final String LOG_COLUMN_NAME_EVENT_ACTION = "ac";
        /**
         * kv前缀
         */
        public static final String LOG_COLUMN_NAME_EVENT_KV_START = "kv_";
        /**
         * duration持续时间
         */
        public static final String LOG_COLUMN_NAME_EVENT_DURATION = "du";
        /**
         * 操作系统名称
         */
        public static final String LOG_COLUMN_NAME_OS_NAME = "os";
        /**
         * 操作系统版本
         */
        public static final String LOG_COLUMN_NAME_OS_VERSION = "os_v";
        /**
         * 浏览器名称
         */
        public static final String LOG_COLUMN_NAME_BROWSER_NAME = "browser";
        /**
         * 浏览器版本
         */
        public static final String LOG_COLUMN_NAME_BROWSER_VERSION = "browser_v";
        /**
         * ip地址解析的所属国家
         */
        public static final String LOG_COLUMN_NAME_COUNTRY = "country";
        /**
         * ip地址解析的所属省份
         */
        public static final String LOG_COLUMN_NAME_PROVINCE = "province";
        /**
         * ip地址解析的所属城市
         */
        public static final String LOG_COLUMN_NAME_CITY = "city";
    
        /**
         * 定义platform
         */
        public static final String LOG_COLUMN_NAME_PLATFORM = "pl";
    
    }


    TimeUtil主要作用解析服务器时间以及定义rowkey中的timestamp时间戳格式。

    package com.kk.util;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.commons.lang.StringUtils;
    
    import com.kk.common.DateEnum;
    
    public class TimeUtil {
    
        /**
         * 将nginx服务器时间转换为时间戳,如果说解析失败,返回-1
         * 
         * @param input
         * @return
         */
        public static long parseNginxServerTime2Long(String input) {
            Date date = parseNginxServerTime2Date(input);
            return date == null ? -1L : date.getTime();
        }
    
        /**
         * 将nginx服务器时间转换为date对象。如果解析失败,返回null
         * 
         * @param input
         *            格式: 1449410796.976
         * @return
         */
        public static Date parseNginxServerTime2Date(String input) {
            if (StringUtils.isNotBlank(input)) {
                try {
                    long timestamp = Double.valueOf(Double.valueOf(input.trim()) * 1000).longValue();
                    Calendar calendar = Calendar.getInstance();
                    calendar.setTimeInMillis(timestamp);
                    return calendar.getTime();
                } catch (Exception e) {
                    // nothing
                }
            }
            return null;
        }
    
        /**
         * 判断输入的参数是否是一个有效的时间格式数据
         * 
         * @param input
         * @return
         */
        public static boolean isValidateRunningDate(String input) {
            Matcher matcher = null;
            boolean result = false;
            String regex = "[0-9]{4}-[0-9]{2}-[0-9]{2}";
            if (input != null && !input.isEmpty()) {
                Pattern pattern = Pattern.compile(regex);
                matcher = pattern.matcher(input);
            }
            if (matcher != null) {
                result = matcher.matches();
            }
            return result;
        }
        public static final String DATE_FORMAT = "yyyy-MM-dd";
    
        /**
         * 获取昨日的日期格式字符串数据
         * 
         * @return
         */
        public static String getYesterday() {
            return getYesterday(DATE_FORMAT);
        }
    
        /**
         * 获取对应格式的时间字符串
         * 
         * @param pattern
         * @return
         */
        public static String getYesterday(String pattern) {
            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
            Calendar calendar = Calendar.getInstance();
            calendar.add(Calendar.DAY_OF_YEAR, -1);
            return sdf.format(calendar.getTime());
        }
    
        /**
         * 将yyyy-MM-dd格式的时间字符串转换为时间戳
         * 
         * @param input
         * @return
         */
        public static long parseString2Long(String input) {
            return parseString2Long(input, DATE_FORMAT);
        }
        /**
         * 将指定格式的时间字符串转换为时间戳
         * 
         * @param input
         * @param pattern
         * @return
         */
        public static long parseString2Long(String input, String pattern) {
            Date date = null;
            try {
                date = new SimpleDateFormat(pattern).parse(input);
            } catch (ParseException e) {
                throw new RuntimeException(e);
            }
            return date.getTime();
        }
    
        /**
         * 将时间戳转换为yyyy-MM-dd格式的时间字符串
         * @param input
         * @return
         */
        public static String parseLong2String(long input) {
            return parseLong2String(input, DATE_FORMAT);
        }
    
        /**
         * 将时间戳转换为指定格式的字符串
         * 
         * @param input
         * @param pattern
         * @return
         */
        public static String parseLong2String(long input, String pattern) {
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(input);
            return new SimpleDateFormat(pattern).format(calendar.getTime());
        }
    
        /**
         * 从时间戳中获取需要的时间信息
         * 
         * @param time
         *            时间戳
         * @param type
         * @return 如果没有匹配的type,抛出异常信息
         */
        public static int getDateInfo(long time, DateEnum type) {
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(time);
            if (DateEnum.YEAR.equals(type)) {
                // 需要年份信息
                return calendar.get(Calendar.YEAR);
            } else if (DateEnum.SEASON.equals(type)) {
                // 需要季度信息
                int month = calendar.get(Calendar.MONTH) + 1;
                if (month % 3 == 0) {
                    return month / 3;
                }
                return month / 3 + 1;
            } else if (DateEnum.MONTH.equals(type)) {
                // 需要月份信息
                return calendar.get(Calendar.MONTH) + 1;
            } else if (DateEnum.WEEK.equals(type)) {
                // 需要周信息
                return calendar.get(Calendar.WEEK_OF_YEAR);
            } else if (DateEnum.DAY.equals(type)) {
                return calendar.get(Calendar.DAY_OF_MONTH);
            } else if (DateEnum.HOUR.equals(type)) {
                return calendar.get(Calendar.HOUR_OF_DAY);
            }
            throw new RuntimeException("没有对应的时间类型:" + type);
        }
    
        /**
         * 获取time指定周的第一天的时间戳值
         * 
         * @param time
         * @return
         */
        public static long getFirstDayOfThisWeek(long time) {
            Calendar cal = Calendar.getInstance();
            cal.setTimeInMillis(time);
            cal.set(Calendar.DAY_OF_WEEK, 1);
            cal.set(Calendar.HOUR_OF_DAY, 0);
            cal.set(Calendar.MINUTE, 0);
            cal.set(Calendar.SECOND, 0);
            cal.set(Calendar.MILLISECOND, 0);
            return cal.getTimeInMillis();
        }
    }


    3. 编写mapper类和runner类

    mapper

    package com.kk.etl.util.mr.ald;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.zip.CRC32;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.log4j.Logger;
    
    import com.kk.common.EventLogConstants;
    import com.kk.common.EventLogConstants.EventEnum;
    import com.kk.etl.util.LoggerUtils;
    
    /**
     * 自定义数据解析map类
     * 
     * @author gerry
     *
     */
    public class AnalyserLogDataMapper extends Mapper<Object, Text, NullWritable, Put> {
        
         private final Logger logger = Logger.getLogger(AnalyserLogDataMapper.class);
            private int inputRecords, filterRecords, outputRecords; // 主要用于标志,方便查看过滤数据
            private byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
            private CRC32 crc32 = new CRC32();
    
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                this.inputRecords++;
                this.logger.debug("Analyse data of :" + value);
    
                try {
                    // 解析日志
                    Map<String, String> clientInfo = LoggerUtils.handleLog(value.toString());
    
                    // 过滤解析失败的数据
                    if (clientInfo.isEmpty()) {
                        this.filterRecords++;
                        return;
                    }
    
                    // 获取事件名称
                    String eventAliasName = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME);
                    EventEnum event = EventEnum.valueOfAlias(eventAliasName);
                    switch (event) {
                    case LAUNCH:
                    case PAGEVIEW:
                    case CHARGEREQUEST:
                    case CHARGEREFUND:
                    case CHARGESUCCESS:
                    case EVENT:
                        // 处理数据
                        this.handleData(clientInfo, event, context);
                        break;
                    default:
                        this.filterRecords++;
                        this.logger.warn("该事件没法进行解析,事件名称为:" + eventAliasName);
                    }
                } catch (Exception e) {
                    this.filterRecords++;
                    this.logger.error("处理数据发出异常,数据:" + value, e);
                }
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
                super.cleanup(context);
                logger.info("输入数据:" + this.inputRecords + ";输出数据:" + this.outputRecords + ";过滤数据:" + this.filterRecords);
            }
    
            /**
             * 具体处理数据的方法
             * 
             * @param clientInfo
             * @param context
             * @param event
             * @throws InterruptedException
             * @throws IOException
             */
            private void handleData(Map<String, String> clientInfo, EventEnum event, Context context) throws IOException, InterruptedException {
                String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
                String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
                String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
                if (StringUtils.isNotBlank(serverTime)) {
                    // 要求服务器时间不为空
                    clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉
                    String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
                                                                                                  // +
                                                                                                  // (uuid+memberid+event).crc
                    Put put = new Put(Bytes.toBytes(rowkey));
                    for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
                        if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
                            put.add(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
                        }
                    }
                    context.write(NullWritable.get(), put);
                    this.outputRecords++;
                } else {
                    this.filterRecords++;
                }
            }
    
            /**
             * 根据uuid memberid servertime创建rowkey
             * 
             * @param uuid
             * @param memberId
             * @param eventAliasName
             * @param serverTime
             * @return
             */
            private String generateRowKey(String uuid, String memberId, String eventAliasName, String serverTime) {
                StringBuilder sb = new StringBuilder();
                sb.append(serverTime).append("_");
                this.crc32.reset();
                if (StringUtils.isNotBlank(uuid)) {
                    this.crc32.update(uuid.getBytes());
                }
                if (StringUtils.isNotBlank(memberId)) {
                    this.crc32.update(memberId.getBytes());
                }
                this.crc32.update(eventAliasName.getBytes());
                sb.append(this.crc32.getValue() % 100000000L);
                return sb.toString();
            }
        
        
        
    }

    runner

    package com.kk.etl.util.mr.ald;
    import java.io.File;
    import java.io.IOException;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    import com.kk.common.EventLogConstants;
    import com.kk.common.GlobalConstants;
    import com.kk.util.EJob;
    import com.kk.util.TimeUtil;
    
    import cz.mallat.uasparser.*;
    
    public class AnalyserLogDataRunner implements Tool{
        private static final Logger logger = Logger.getLogger(AnalyserLogDataRunner.class);
        private Configuration conf = null;
    
        public static void main(String[] args) {
            try {
                ToolRunner.run(new Configuration(), new AnalyserLogDataRunner(), args);
            } catch (Exception e) {
                logger.error("执行日志解析job异常", e);
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void setConf(Configuration conf) {
            this.conf = HBaseConfiguration.create(conf);
        }
    
        @Override
        public Configuration getConf() {
            return this.conf;
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = this.getConf();
            this.processArgs(conf, args);
            conf.set("fs.defaultFS","hdfs://hadoop-001:9000");
            conf.set("mapreduce.framework.name","yarn");
            conf.set("yarn.resourcemanager.hostname","hadoop-002");
    
            Job job = Job.getInstance(conf, "analyser_logdata" );
    
            // 设置本地提交job,集群运行,需要代码
            File jarFile = EJob.createTempJar("target/classes");
            JobConf jobConf=(JobConf)job.getConfiguration();
            jobConf.setJar(jarFile.toString());
            // 设置本地提交job,集群运行,需要代码结束
    
            job.setJarByClass(AnalyserLogDataRunner.class);
            job.setMapperClass(AnalyserLogDataMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Put.class);
           
            // 设置reducer配置
    //         1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true)
            conf.set("addDependencyJars", "true");
            TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job);
            // 2. 本地运行,要求参数addDependencyJars为false
    //     
    //        conf.set("addDependencyJars", "false");
    //        TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,
    //         null, job, null, null, null, null, false);
            job.setNumReduceTasks(0);
    
            // 设置输入路径
            this.setJobInputPaths(job);
            return job.waitForCompletion(true) ? 0 : -1;
        }
    
        /**
         * 处理参数
         * 
         * @param conf
         * @param args
         */
        private void processArgs(Configuration conf, String[] args) {
            String date = null;
            for (int i = 0; i < args.length; i++) {
                if ("-d".equals(args[i])) {
                    if (i + 1 < args.length) {
                        date = args[++i];
                        break;
                    }
                }
            }
    
            // 要求date格式为: yyyy-MM-dd
            if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
                // date是一个无效时间数据
                date = TimeUtil.getYesterday(); // 默认时间是昨天
            }
            conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
        }
    
        /**
         * 设置job的输入路径
         * 
         * @param job
         */
        private void setJobInputPaths(Job job) {
            Configuration conf = job.getConfiguration();
            FileSystem fs = null;
            try {
                fs = FileSystem.get(conf);
                String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
                Path inputPath = new Path("/logs/nginx/" + TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), "MM/dd"));//+"/BF-01.1553792641215");
                if (fs.exists(inputPath)) {
                    FileInputFormat.addInputPath(job, inputPath);
                } else {
                    throw new RuntimeException("文件不存在:" + inputPath);
                }
            } catch (IOException e) {
                throw new RuntimeException("设置job的mapreduce输入路径出现异常", e);
            } finally {
                if (fs != null) {
                    try {
                        fs.close();
                    } catch (IOException e) {
                        // nothing
                    }
                }
            }
        }
    
    }


    4. 添加环境变量文件,core-site.xml hbase-site.xml log4j.properties 根据不同的运行情况,修改源码将修改后的源码放到代码中。


    5. 添加pom编译代码,并进行测试
    本地运行测试: 需要注意的就是windows环境可能会导致出现access方法调用异常,需要修改nativeio这个java文件。
    使用TableMapReduceUtil的时候如果出现异常:


    *****/htrace-core-2.04.jar from hdfs://***/htrace-core-2.04.jar is not a valid DFS filename.


    就需要将addDependencyJars参数设置为false。
    本地提交job,集群运行测试:
    本地需要知道提交的job是需要提交到集群上的,所以需要指定两个参数mapreduce.framework.name和yarn.resourcemanager.address,value分别为yarn和hh:8032即可,但是可能会出现异常信息,此时需要将参数mapreduce.app-submission.cross-platform设置为true。

    参数设置:


    mapreduce.framework.name=yarn
    yarn.resourcemanager.address=hadoop-001:8032
    mapreduce.app-submission.cross-platform=true



    目录结构如下

    异常:
    1. Permission denied: user=gerry, access=EXECUTE, inode="/tmp":hadoop:supergroup:drwx------
    解决方案:执行


    hdfs dfs -chmod -R 777 /tmp




    2. Stack trace: ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control
    解决方案:


    添加mapreduce.app-submission.cross-platform=true




    3. ExitCodeException exitCode=1:
    解决方案:


    habse指定输出reducer的时候必须给定addDependencyJars参数为true。




    4. Class com.beifeng.etl.mr.ald.AnalyserLogDataMapper not found
    解决方案:


    引入EJob.java文件,然后再runner类中添加代码
    File jarFile = EJob.createTempJar("target/classes");
    ((JobConf) job.getConfiguration()).setJar(jarFile.toString());



    集群提交&运行job测试:

  • 相关阅读:
    android高级UI之Paint Xfermode
    android高级UI之Paint滤镜
    常见文献管理软件
    linux下10款markdown软件
    markdown页面内跳转
    Ubuntu18.04配制阿里巴巴的源
    python中TAB补全
    word中手动添加endnote的加载项
    MarkDown添加图片的三种方式
    word前页与后页页码断开
  • 原文地址:https://www.cnblogs.com/Transkai/p/10724414.html
Copyright © 2011-2022 走看看