zoukankan      html  css  js  c++  java
  • 【电商日志项目之五】数据分析-MR方式

    环境
      hadoop-2.6.5
      hbase-0.98.12.1-hadoop2

    新增用户指标分析
    (1)用户分析模块


    (2)浏览器分析模块

    根据分析效果图,找出分析的维度:
    用户分析是指某个时间段内的数量变化,浏览器用户分析自然就是某个浏览器在某个时间段内的数量变化,那么根据现有条件确定统计分类的种类,举例说明:

    用户基本信息模块:新增用户(时间)
    浏览器分析模块:新增用户(时间,浏览器信息)
    2018-08-10 www.bjsxt.com zhangsan firefox-48
    2018-08-10 www.bjsxt.com lisi firefox-53

    MR
    map:
    2018-08-10 zhangsan
    2018-08-10,firefox-48 zhangsan
    2018-08-10,firefox-all zhangsan

    2018-08-10 lisi
    2018-08-10,firefox-53 lisi
    2018-08-10,firefix-all lisi

    reduce:
    2018-08-10 zhangsan 2
    2018-08-10 lisi

    2018-08-10,firefox-48 zhangsan 1

    2018-08-10,firefox-53 lisi 1

    2018-08-10,firefix-all lisi 2
    2018-08-10,firefox-all zhangsan


    模块当做其中一个条件

    时间,用户基本信息模块
    时间,浏览器,浏览器分析模块

    2018-08-10 www.bjsxt.com zhangsan firefox-48
    2018-08-10 www.bjsxt.com lisi firefox-53

    map
    2018-08-10,user zhangsan
    2018-08-10,firefox-48,browser zhangsan
    2018-08-10,firefix-all,browser zhangsan

    2018-08-10,user lisi
    2018-08-10,firefox-53,browser lisi
    2018-08-10,firefix-all,browser lisi
    reduce
    2018-08-10,user zhangsan 2
    2018-08-10,user lisi

    2018-08-10,firefox-48,browser zhangsan 1

    2018-08-10,firefox-53,browser lisi 1

    2018-08-10,firefix-all,browser zhangsan 2
    2018-08-10,firefix-all,browser lisi

    Runner:

    package com.sxt.transformer.mr.nu;
    
    import java.sql.Connection;
    import java.sql.Date;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    import com.google.common.collect.Lists;
    import com.sxt.common.DateEnum;
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.EventLogConstants.EventEnum;
    import com.sxt.common.GlobalConstants;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.dim.base.DateDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    import com.sxt.transformer.model.value.reduce.MapWritableValue;
    import com.sxt.transformer.mr.TransformerOutputFormat;
    import com.sxt.util.JdbcManager;
    import com.sxt.util.TimeUtil;
    
    /**
     * 计算新增用户入口类
     * 
     * @author root
     *
     */
    public class NewInstallUserRunner implements Tool
    {
        private static final Logger logger = Logger.getLogger(NewInstallUserRunner.class);
        private Configuration conf = new Configuration();
    
        /**
         * 入口main方法
         * 
         * @param args
         */
        public static void main(String[] args)
        {
            try 
            {
                ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args);
            } 
            catch (Exception e) 
            {
                logger.error("运行计算新用户的job出现异常", e);
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void setConf(Configuration conf) 
        {
            conf.addResource("output-collector.xml");
            conf.addResource("query-mapping.xml");
            conf.addResource("transformer-env.xml");
            conf.set("fs.defaultFS", "hdfs://node101:8020");//HDFS
    //        conf.set("yarn.resourcemanager.hostname", "node3");
            conf.set("hbase.zookeeper.quorum", "node104");//HBase
            this.conf = HBaseConfiguration.create(conf);
        }
    
        @Override
        public Configuration getConf() {
            return this.conf;
        }
    
        @Override
        public int run(String[] args) throws Exception
        {
            Configuration conf = this.getConf();
            // 处理参数
            this.processArgs(conf, args);
    
            Job job = Job.getInstance(conf, "new_install_user");
    
            job.setJarByClass(NewInstallUserRunner.class);
            // 本地运行
            TableMapReduceUtil.initTableMapperJob(
                    initScans(job), 
                    NewInstallUserMapper.class, 
                    StatsUserDimension.class, 
                    TimeOutputValue.class, 
                    job, 
                    false);
            // 集群运行:本地提交和打包(jar)提交
    //      TableMapReduceUtil.initTableMapperJob(initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job);
            job.setReducerClass(NewInstallUserReducer.class);
            job.setOutputKeyClass(StatsUserDimension.class);//维度作为key
            job.setOutputValueClass(MapWritableValue.class);
    //        job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TransformerOutputFormat.class);//自定义输出到mysql的outputformat类
            if (job.waitForCompletion(true)) {
                // 执行成功, 需要计算总用户
                this.calculateTotalUsers(conf);
                return 0;
            } 
            else 
            {
                return -1;
            }
        }
    
        /**
         * 计算总用户
         * 查询昨天和今天统计的数据  然后累加并记录最新数据
         * @param conf
         */
        private void calculateTotalUsers(Configuration conf) {
            Connection conn = null;
            PreparedStatement pstmt = null;
            ResultSet rs = null;
    
            try {
                long date = TimeUtil.parseString2Long(conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
                // 获取今天的date dimension
                DateDimension todayDimension = DateDimension.buildDate(date, DateEnum.DAY);
                // 获取昨天的date dimension
                DateDimension yesterdayDimension = DateDimension.buildDate(date - GlobalConstants.DAY_OF_MILLISECONDS, DateEnum.DAY);
                int yesterdayDimensionId = -1;
                int todayDimensionId = -1;
    
                // 1. 获取时间id
                conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT);
                // 获取执行时间的昨天的
                pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?");
                int i = 0;
                pstmt.setInt(++i, yesterdayDimension.getYear());
                pstmt.setInt(++i, yesterdayDimension.getSeason());
                pstmt.setInt(++i, yesterdayDimension.getMonth());
                pstmt.setInt(++i, yesterdayDimension.getWeek());
                pstmt.setInt(++i, yesterdayDimension.getDay());
                pstmt.setString(++i, yesterdayDimension.getType());
                pstmt.setDate(++i, new Date(yesterdayDimension.getCalendar().getTime()));
                rs = pstmt.executeQuery();
                if (rs.next()) {
                    yesterdayDimensionId = rs.getInt(1);
                }
    
                // 获取执行时间当天的id
                pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?");
                i = 0;
                pstmt.setInt(++i, todayDimension.getYear());
                pstmt.setInt(++i, todayDimension.getSeason());
                pstmt.setInt(++i, todayDimension.getMonth());
                pstmt.setInt(++i, todayDimension.getWeek());
                pstmt.setInt(++i, todayDimension.getDay());
                pstmt.setString(++i, todayDimension.getType());
                pstmt.setDate(++i, new Date(todayDimension.getCalendar().getTime()));
                rs = pstmt.executeQuery();
                if (rs.next()) {
                    todayDimensionId = rs.getInt(1);
                }
    
                // 2.获取昨天的原始数据,存储格式为:platformid = totalusers
                Map<String, Integer> oldValueMap = new HashMap<String, Integer>();
    
                // 开始更新stats_user
                if (yesterdayDimensionId > -1) {
                    pstmt = conn.prepareStatement("select `platform_dimension_id`,`total_install_users` from `stats_user` where `date_dimension_id`=?");
                    pstmt.setInt(1, yesterdayDimensionId);
                    rs = pstmt.executeQuery();
                    while (rs.next()) {
                        int platformId = rs.getInt("platform_dimension_id");
                        int totalUsers = rs.getInt("total_install_users");
                        oldValueMap.put("" + platformId, totalUsers);
                    }
                }
    
                // 添加今天的总用户
                pstmt = conn.prepareStatement("select `platform_dimension_id`,`new_install_users` from `stats_user` where `date_dimension_id`=?");
                pstmt.setInt(1, todayDimensionId);
                rs = pstmt.executeQuery();
                while (rs.next()) {
                    int platformId = rs.getInt("platform_dimension_id");
                    int newUsers = rs.getInt("new_install_users");
                    if (oldValueMap.containsKey("" + platformId)) {
                        newUsers += oldValueMap.get("" + platformId);
                    }
                    oldValueMap.put("" + platformId, newUsers);
                }
    
                // 更新操作
                pstmt = conn.prepareStatement("INSERT INTO `stats_user`(`platform_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?");
                for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) {
                    pstmt.setInt(1, Integer.valueOf(entry.getKey()));
                    pstmt.setInt(2, todayDimensionId);
                    pstmt.setInt(3, entry.getValue());
                    pstmt.setInt(4, entry.getValue());
                    pstmt.execute();
                }
    
             // 开始更新stats_device_browser
                oldValueMap.clear();
                if (yesterdayDimensionId > -1) {
                    pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`total_install_users` from `stats_device_browser` where `date_dimension_id`=?");
                    pstmt.setInt(1, yesterdayDimensionId);
                    rs = pstmt.executeQuery();
                    while (rs.next()) {
                        int platformId = rs.getInt("platform_dimension_id");
                        int browserId = rs.getInt("browser_dimension_id");
                        int totalUsers = rs.getInt("total_install_users");
                        oldValueMap.put(platformId + "_" + browserId, totalUsers);
                    }
                }
    
                // 添加今天的总用户
                pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`new_install_users` from `stats_device_browser` where `date_dimension_id`=?");
                pstmt.setInt(1, todayDimensionId);
                rs = pstmt.executeQuery();
                while (rs.next()) {
                    int platformId = rs.getInt("platform_dimension_id");
                    int browserId = rs.getInt("browser_dimension_id");
                    int newUsers = rs.getInt("new_install_users");
                    String key = platformId + "_" + browserId;
                    if (oldValueMap.containsKey(key)) {
                        newUsers += oldValueMap.get(key);
                    }
                    oldValueMap.put(key, newUsers);
                }
    
                // 更新操作
                pstmt = conn.prepareStatement("INSERT INTO `stats_device_browser`(`platform_dimension_id`,`browser_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?");
                for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) {
                    String[] key = entry.getKey().split("_");
                    pstmt.setInt(1, Integer.valueOf(key[0]));
                    pstmt.setInt(2, Integer.valueOf(key[1]));
                    pstmt.setInt(3, todayDimensionId);
                    pstmt.setInt(4, entry.getValue());
                    pstmt.setInt(5, entry.getValue());
                    pstmt.execute();
                }
                
            } 
            catch (SQLException e) 
            {
                e.printStackTrace();
            }
        }
    
        /**
         * 处理参数
         * 
         * @param conf
         * @param args
         */
        private void processArgs(Configuration conf, String[] args) {
            String date = null;
            for (int i = 0; i < args.length; i++) {
                if ("-d".equals(args[i])) {
                    if (i + 1 < args.length) {
                        date = args[++i];
                        break;
                    }
                }
            }
    
            // 要求date格式为: yyyy-MM-dd
            if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
                // date是一个无效时间数据
                date = TimeUtil.getYesterday(); // 默认时间是昨天
            }
            System.out.println("----------------------" + date);
            conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
        }
    
        /**
         * 初始化scan集合
         * 
         * @param job
         * @return
         */
        private List<Scan> initScans(Job job) {
            // 时间戳+....
            Configuration conf = job.getConfiguration();
            // 获取运行时间: yyyy-MM-dd
            String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
            long startDate = TimeUtil.parseString2Long(date);
            long endDate = startDate + GlobalConstants.DAY_OF_MILLISECONDS;
    
            Scan scan = new Scan();
            // 定义hbase扫描的开始rowkey和结束rowkey
            scan.setStartRow(Bytes.toBytes("" + startDate));
            scan.setStopRow(Bytes.toBytes("" + endDate));
            
            FilterList filterList = new FilterList();
            // 过滤数据,只分析launch事件
            filterList.addFilter(new SingleColumnValueFilter(
                    Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
                    Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), 
                    CompareOp.EQUAL, 
                    Bytes.toBytes(EventEnum.LAUNCH.alias)));
            // 定义mapper中需要获取的列名
            String[] columns = new String[] {
                        EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, 
                        EventLogConstants.LOG_COLUMN_NAME_UUID, 
                        EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, 
                        EventLogConstants.LOG_COLUMN_NAME_PLATFORM, 
                        EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, 
                        EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION };
    //        scan.addColumn(family, qualifier)
            filterList.addFilter(this.getColumnFilter(columns));
    
            scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
                    Bytes.toBytes(EventLogConstants.HBASE_NAME_EVENT_LOGS));
            scan.setFilter(filterList);
            return Lists.newArrayList(scan);
        }
    
        /**
         * 获取这个列名过滤的column
         * 
         * @param columns
         * @return
         */
        private Filter getColumnFilter(String[] columns) {
            int length = columns.length;
            byte[][] filter = new byte[length][];
            for (int i = 0; i < length; i++) {
                filter[i] = Bytes.toBytes(columns[i]);
            }
            return new MultipleColumnPrefixFilter(filter);
        }
    }

    Mapper:

    package com.sxt.transformer.mr.nu;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.log4j.Logger;
    
    import com.sxt.common.DateEnum;
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.KpiType;
    import com.sxt.transformer.model.dim.StatsCommonDimension;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.dim.base.BrowserDimension;
    import com.sxt.transformer.model.dim.base.DateDimension;
    import com.sxt.transformer.model.dim.base.KpiDimension;
    import com.sxt.transformer.model.dim.base.PlatformDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    
    /**
     * 自定义的计算新用户的mapper类
     * 
     * @author root
     *
     */
    public class NewInstallUserMapper extends TableMapper<StatsUserDimension, TimeOutputValue> {
        //每个分析条件(由各个维度组成的)作为key,uuid作为value
       
        private static final Logger logger = Logger.getLogger(NewInstallUserMapper.class);
    
        private StatsUserDimension statsUserDimension = new StatsUserDimension();
        private TimeOutputValue timeOutputValue = new TimeOutputValue();
        
        private byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
        
        //代表用户分析模块的统计
        private KpiDimension newInstallUserKpi = new KpiDimension(KpiType.NEW_INSTALL_USER.name);
        //浏览器分析模块的统计
        private KpiDimension newInstallUserOfBrowserKpi = new KpiDimension(KpiType.BROWSER_NEW_INSTALL_USER.name);
    
        /**
         * map 读取hbase中的数据,输入数据为:hbase表中每一行。
         * 输出key类型:StatsUserDimension
         * value类型:TimeOutputValue
         */
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context)  throws IOException, InterruptedException 
        {
            String uuid = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));
            String serverTime = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)));
            String platform = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
            
            System.out.println(uuid + "-" + serverTime + "-" + platform);
            
            if (StringUtils.isBlank(uuid) || StringUtils.isBlank(serverTime) || StringUtils.isBlank(platform)) {
                logger.warn("uuid&servertime&platform不能为空");
                return;
            }
            
            long longOfTime = Long.valueOf(serverTime.trim());
            timeOutputValue.setId(uuid); // 设置id为uuid
            timeOutputValue.setTime(longOfTime); // 设置时间为服务器时间
            
            // 设置date维度
            DateDimension dateDimension = DateDimension.buildDate(longOfTime, DateEnum.DAY);
            StatsCommonDimension statsCommonDimension = this.statsUserDimension.getStatsCommon();
            statsCommonDimension.setDate(dateDimension);
    
            List<PlatformDimension> platformDimensions = PlatformDimension.buildList(platform);
            
            // browser相关的数据
            String browserName = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME)));
            String browserVersion = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION)));
            List<BrowserDimension> browserDimensions = BrowserDimension.buildList(browserName, browserVersion);
           
            //空浏览器维度,不考虑浏览器维度
            BrowserDimension defaultBrowser = new BrowserDimension("", "");
            for (PlatformDimension pf : platformDimensions) {
                //用户分析
                statsUserDimension.setBrowser(defaultBrowser);
                statsCommonDimension.setKpi(newInstallUserKpi);
                statsCommonDimension.setPlatform(pf);
                context.write(statsUserDimension, timeOutputValue);
                //浏览器用户分析
                for (BrowserDimension br : browserDimensions) {
                    statsCommonDimension.setKpi(newInstallUserOfBrowserKpi);
                    statsUserDimension.setBrowser(br);
                    context.write(statsUserDimension, timeOutputValue);
                }
            }
        }
    }

    Reducer:

    package com.sxt.transformer.mr.nu;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.MapWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import com.sxt.common.KpiType;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    import com.sxt.transformer.model.value.reduce.MapWritableValue;
    
    /**
     * 计算new isntall user的reduce类
     * 
     * @author root
     *
     */
    public class NewInstallUserReducer extends Reducer<StatsUserDimension, TimeOutputValue, StatsUserDimension, MapWritableValue> {
        private MapWritableValue outputValue = new MapWritableValue();
        private Set<String> unique = new HashSet<String>();
    
        @Override
        protected void reduce(StatsUserDimension key, Iterable<TimeOutputValue> values, Context context) throws IOException, InterruptedException {
            this.unique.clear();
    
            // 开始计算uuid的个数
            for (TimeOutputValue value : values) {
                this.unique.add(value.getId());//uid,用户ID
            }
            
            MapWritable map = new MapWritable();//相当于java中HashMap
            map.put(new IntWritable(-1), new IntWritable(this.unique.size()));
            outputValue.setValue(map);
    
            // 设置kpi名称-模块名 告诉数据库插入哪张表中
            String kpiName = key.getStatsCommon().getKpi().getKpiName();
            if (KpiType.NEW_INSTALL_USER.name.equals(kpiName)) {
                // 计算stats_user表中的新增用户
                outputValue.setKpi(KpiType.NEW_INSTALL_USER);
            } else if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(kpiName)) {
                // 计算stats_device_browser表中的新增用户
                outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
            }
            context.write(key, outputValue);
        }
    }

    统计维度StatsUserDimension:

    package com.sxt.transformer.mr.nu;
    
    import java.sql.Connection;
    import java.sql.Date;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.log4j.Logger;
    
    import com.google.common.collect.Lists;
    import com.sxt.common.DateEnum;
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.EventLogConstants.EventEnum;
    import com.sxt.common.GlobalConstants;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.dim.base.DateDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    import com.sxt.transformer.model.value.reduce.MapWritableValue;
    import com.sxt.transformer.mr.TransformerOutputFormat;
    import com.sxt.util.JdbcManager;
    import com.sxt.util.TimeUtil;
    
    /**
     * 计算新增用户入口类
     * 
     * @author root
     *
     */
    public class NewInstallUserRunner implements Tool
    {
        private static final Logger logger = Logger.getLogger(NewInstallUserRunner.class);
        private Configuration conf = new Configuration();
    
        /**
         * 入口main方法
         * 
         * @param args
         */
        public static void main(String[] args)
        {
            try 
            {
                ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args);
            } 
            catch (Exception e) 
            {
                logger.error("运行计算新用户的job出现异常", e);
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void setConf(Configuration conf) 
        {
            conf.addResource("output-collector.xml");
            conf.addResource("query-mapping.xml");
            conf.addResource("transformer-env.xml");
            conf.set("fs.defaultFS", "hdfs://node101:8020");//HDFS
    //        conf.set("yarn.resourcemanager.hostname", "node3");
            conf.set("hbase.zookeeper.quorum", "node104");//HBase
            this.conf = HBaseConfiguration.create(conf);
        }
    
        @Override
        public Configuration getConf() {
            return this.conf;
        }
    
        @Override
        public int run(String[] args) throws Exception
        {
            Configuration conf = this.getConf();
            // 处理参数
            this.processArgs(conf, args);
    
            Job job = Job.getInstance(conf, "new_install_user");
    
            job.setJarByClass(NewInstallUserRunner.class);
            // 本地运行
            TableMapReduceUtil.initTableMapperJob(
                    initScans(job), 
                    NewInstallUserMapper.class, 
                    StatsUserDimension.class, 
                    TimeOutputValue.class, 
                    job, 
                    false);
            // 集群运行:本地提交和打包(jar)提交
    //      TableMapReduceUtil.initTableMapperJob(initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job);
            job.setReducerClass(NewInstallUserReducer.class);
            job.setOutputKeyClass(StatsUserDimension.class);//维度作为key
            job.setOutputValueClass(MapWritableValue.class);
    //        job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TransformerOutputFormat.class);//自定义输出到mysql的outputformat类
            if (job.waitForCompletion(true)) {
                // 执行成功, 需要计算总用户
                this.calculateTotalUsers(conf);
                return 0;
            } 
            else 
            {
                return -1;
            }
        }
    
        /**
         * 计算总用户
         * 查询昨天和今天统计的数据  然后累加并记录最新数据
         * @param conf
         */
        private void calculateTotalUsers(Configuration conf) {
            Connection conn = null;
            PreparedStatement pstmt = null;
            ResultSet rs = null;
    
            try {
                long date = TimeUtil.parseString2Long(conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
                // 获取今天的date dimension
                DateDimension todayDimension = DateDimension.buildDate(date, DateEnum.DAY);
                // 获取昨天的date dimension
                DateDimension yesterdayDimension = DateDimension.buildDate(date - GlobalConstants.DAY_OF_MILLISECONDS, DateEnum.DAY);
                int yesterdayDimensionId = -1;
                int todayDimensionId = -1;
    
                // 1. 获取时间id
                conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT);
                // 获取执行时间的昨天的
                pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?");
                int i = 0;
                pstmt.setInt(++i, yesterdayDimension.getYear());
                pstmt.setInt(++i, yesterdayDimension.getSeason());
                pstmt.setInt(++i, yesterdayDimension.getMonth());
                pstmt.setInt(++i, yesterdayDimension.getWeek());
                pstmt.setInt(++i, yesterdayDimension.getDay());
                pstmt.setString(++i, yesterdayDimension.getType());
                pstmt.setDate(++i, new Date(yesterdayDimension.getCalendar().getTime()));
                rs = pstmt.executeQuery();
                if (rs.next()) {
                    yesterdayDimensionId = rs.getInt(1);
                }
    
                // 获取执行时间当天的id
                pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?");
                i = 0;
                pstmt.setInt(++i, todayDimension.getYear());
                pstmt.setInt(++i, todayDimension.getSeason());
                pstmt.setInt(++i, todayDimension.getMonth());
                pstmt.setInt(++i, todayDimension.getWeek());
                pstmt.setInt(++i, todayDimension.getDay());
                pstmt.setString(++i, todayDimension.getType());
                pstmt.setDate(++i, new Date(todayDimension.getCalendar().getTime()));
                rs = pstmt.executeQuery();
                if (rs.next()) {
                    todayDimensionId = rs.getInt(1);
                }
    
                // 2.获取昨天的原始数据,存储格式为:platformid = totalusers
                Map<String, Integer> oldValueMap = new HashMap<String, Integer>();
    
                // 开始更新stats_user
                if (yesterdayDimensionId > -1) {
                    pstmt = conn.prepareStatement("select `platform_dimension_id`,`total_install_users` from `stats_user` where `date_dimension_id`=?");
                    pstmt.setInt(1, yesterdayDimensionId);
                    rs = pstmt.executeQuery();
                    while (rs.next()) {
                        int platformId = rs.getInt("platform_dimension_id");
                        int totalUsers = rs.getInt("total_install_users");
                        oldValueMap.put("" + platformId, totalUsers);
                    }
                }
    
                // 添加今天的总用户
                pstmt = conn.prepareStatement("select `platform_dimension_id`,`new_install_users` from `stats_user` where `date_dimension_id`=?");
                pstmt.setInt(1, todayDimensionId);
                rs = pstmt.executeQuery();
                while (rs.next()) {
                    int platformId = rs.getInt("platform_dimension_id");
                    int newUsers = rs.getInt("new_install_users");
                    if (oldValueMap.containsKey("" + platformId)) {
                        newUsers += oldValueMap.get("" + platformId);
                    }
                    oldValueMap.put("" + platformId, newUsers);
                }
    
                // 更新操作
                pstmt = conn.prepareStatement("INSERT INTO `stats_user`(`platform_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?");
                for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) {
                    pstmt.setInt(1, Integer.valueOf(entry.getKey()));
                    pstmt.setInt(2, todayDimensionId);
                    pstmt.setInt(3, entry.getValue());
                    pstmt.setInt(4, entry.getValue());
                    pstmt.execute();
                }
    
             // 开始更新stats_device_browser
                oldValueMap.clear();
                if (yesterdayDimensionId > -1) {
                    pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`total_install_users` from `stats_device_browser` where `date_dimension_id`=?");
                    pstmt.setInt(1, yesterdayDimensionId);
                    rs = pstmt.executeQuery();
                    while (rs.next()) {
                        int platformId = rs.getInt("platform_dimension_id");
                        int browserId = rs.getInt("browser_dimension_id");
                        int totalUsers = rs.getInt("total_install_users");
                        oldValueMap.put(platformId + "_" + browserId, totalUsers);
                    }
                }
    
                // 添加今天的总用户
                pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`new_install_users` from `stats_device_browser` where `date_dimension_id`=?");
                pstmt.setInt(1, todayDimensionId);
                rs = pstmt.executeQuery();
                while (rs.next()) {
                    int platformId = rs.getInt("platform_dimension_id");
                    int browserId = rs.getInt("browser_dimension_id");
                    int newUsers = rs.getInt("new_install_users");
                    String key = platformId + "_" + browserId;
                    if (oldValueMap.containsKey(key)) {
                        newUsers += oldValueMap.get(key);
                    }
                    oldValueMap.put(key, newUsers);
                }
    
                // 更新操作
                pstmt = conn.prepareStatement("INSERT INTO `stats_device_browser`(`platform_dimension_id`,`browser_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?");
                for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) {
                    String[] key = entry.getKey().split("_");
                    pstmt.setInt(1, Integer.valueOf(key[0]));
                    pstmt.setInt(2, Integer.valueOf(key[1]));
                    pstmt.setInt(3, todayDimensionId);
                    pstmt.setInt(4, entry.getValue());
                    pstmt.setInt(5, entry.getValue());
                    pstmt.execute();
                }
                
            } 
            catch (SQLException e) 
            {
                e.printStackTrace();
            }
        }
    
        /**
         * 处理参数
         * 
         * @param conf
         * @param args
         */
        private void processArgs(Configuration conf, String[] args) {
            String date = null;
            for (int i = 0; i < args.length; i++) {
                if ("-d".equals(args[i])) {
                    if (i + 1 < args.length) {
                        date = args[++i];
                        break;
                    }
                }
            }
    
            // 要求date格式为: yyyy-MM-dd
            if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
                // date是一个无效时间数据
                date = TimeUtil.getYesterday(); // 默认时间是昨天
            }
            System.out.println("----------------------" + date);
            conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
        }
    
        /**
         * 初始化scan集合
         * 
         * @param job
         * @return
         */
        private List<Scan> initScans(Job job) {
            // 时间戳+....
            Configuration conf = job.getConfiguration();
            // 获取运行时间: yyyy-MM-dd
            String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
            long startDate = TimeUtil.parseString2Long(date);
            long endDate = startDate + GlobalConstants.DAY_OF_MILLISECONDS;
    
            Scan scan = new Scan();
            // 定义hbase扫描的开始rowkey和结束rowkey
            scan.setStartRow(Bytes.toBytes("" + startDate));
            scan.setStopRow(Bytes.toBytes("" + endDate));
            
            FilterList filterList = new FilterList();
            // 过滤数据,只分析launch事件
            filterList.addFilter(new SingleColumnValueFilter(
                    Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
                    Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), 
                    CompareOp.EQUAL, 
                    Bytes.toBytes(EventEnum.LAUNCH.alias)));
            // 定义mapper中需要获取的列名
            String[] columns = new String[] {
                        EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, 
                        EventLogConstants.LOG_COLUMN_NAME_UUID, 
                        EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, 
                        EventLogConstants.LOG_COLUMN_NAME_PLATFORM, 
                        EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, 
                        EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION };
    //        scan.addColumn(family, qualifier)
            filterList.addFilter(this.getColumnFilter(columns));
    
            scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
                    Bytes.toBytes(EventLogConstants.HBASE_NAME_EVENT_LOGS));
            scan.setFilter(filterList);
            return Lists.newArrayList(scan);
        }
    
        /**
         * 获取这个列名过滤的column
         * 
         * @param columns
         * @return
         */
        private Filter getColumnFilter(String[] columns) {
            int length = columns.length;
            byte[][] filter = new byte[length][];
            for (int i = 0; i < length; i++) {
                filter[i] = Bytes.toBytes(columns[i]);
            }
            return new MultipleColumnPrefixFilter(filter);
        }
    }

    输出到mysql的outputformat类:

    package com.sxt.transformer.mr;
    
    import java.io.IOException;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.OutputCommitter;
    import org.apache.hadoop.mapreduce.OutputFormat;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.log4j.Logger;
    
    import com.sxt.common.GlobalConstants;
    import com.sxt.common.KpiType;
    import com.sxt.transformer.model.dim.base.BaseDimension;
    import com.sxt.transformer.model.value.BaseStatsValueWritable;
    import com.sxt.transformer.service.IDimensionConverter;
    import com.sxt.transformer.service.impl.DimensionConverterImpl;
    import com.sxt.util.JdbcManager;
    
    /**
     * 自定义输出到mysql的outputformat类
     * BaseDimension:reducer输出的key
     * BaseStatsValueWritable:reducer输出的value
     * @author root
     *
     */
    public class TransformerOutputFormat extends OutputFormat<BaseDimension, BaseStatsValueWritable> {
        private static final Logger logger = Logger.getLogger(TransformerOutputFormat.class);
    
        /**
         * 定义每条数据的输出格式,一条数据就是reducer任务每次执行write方法输出的数据。
         */
        @Override
        public RecordWriter<BaseDimension, BaseStatsValueWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            Connection conn = null;
            IDimensionConverter converter = new DimensionConverterImpl();
            try {
                conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT);
                conn.setAutoCommit(false);
            } catch (SQLException e) {
                logger.error("获取数据库连接失败", e);
                throw new IOException("获取数据库连接失败", e);
            }
            return new TransformerRecordWriter(conn, conf, converter);
        }
    
        @Override
        public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
            // 检测输出空间,输出到mysql不用检测
        }
    
        @Override
        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
            return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
        }
    
        /**
         * 自定义具体数据输出writer
         * 
         * @author root
         *
         */
        public class TransformerRecordWriter extends RecordWriter<BaseDimension, BaseStatsValueWritable> {
            private Connection conn = null;
            private Configuration conf = null;
            private IDimensionConverter converter = null;
            private Map<KpiType, PreparedStatement> map = new HashMap<KpiType, PreparedStatement>();
            private Map<KpiType, Integer> batch = new HashMap<KpiType, Integer>();
    
            public TransformerRecordWriter(Connection conn, Configuration conf, IDimensionConverter converter) {
                super();
                this.conn = conn;
                this.conf = conf;
                this.converter = converter;
            }
    
            @Override
            /**
             * 当reduce任务输出数据是,由计算框架自动调用。把reducer输出的数据写到mysql中
             */
            public void write(BaseDimension key, BaseStatsValueWritable value) throws IOException, InterruptedException {
                if (key == null || value == null) {
                    return;
                }
    
                try {
                    KpiType kpi = value.getKpi();
                    PreparedStatement pstmt = null;//每一个pstmt对象对应一个sql语句
                    int count = 1;//sql语句的批处理,一次执行10
                    if (map.get(kpi) == null) 
                    {
                        // 使用kpi进行区分,返回sql保存到config中
                        pstmt = this.conn.prepareStatement(conf.get(kpi.name));
                        map.put(kpi, pstmt);
                    } 
                    else 
                    {
                        pstmt = map.get(kpi);
                        count = batch.get(kpi);
                        count++;
                    }
                    batch.put(kpi, count); // 批量次数的存储
    
                    String collectorName = conf.get(GlobalConstants.OUTPUT_COLLECTOR_KEY_PREFIX + kpi.name);
                    Class<?> clazz = Class.forName(collectorName);
                    IOutputCollector collector = (IOutputCollector) clazz.newInstance();//把value插入到mysql的方法。由于kpi维度不一样。插入到不能表里面。
                    collector.collect(conf, key, value, pstmt, converter);
    
                    if (count % Integer.valueOf(conf.get(GlobalConstants.JDBC_BATCH_NUMBER, GlobalConstants.DEFAULT_JDBC_BATCH_NUMBER)) == 0) {
                        pstmt.executeBatch();
                        conn.commit();
                        batch.put(kpi, 0); // 对应批量计算删除
                    }
                }
                catch (Throwable e)
                {
                    logger.error("在writer中写数据出现异常", e);
                    throw new IOException(e);
                }
            }
    
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                try {
                    for (Map.Entry<KpiType, PreparedStatement> entry : this.map.entrySet()) {
                        entry.getValue().executeBatch();
                    }
                } catch (SQLException e) {
                    logger.error("执行executeUpdate方法异常", e);
                    throw new IOException(e);
                } finally {
                    try {
                        if (conn != null) {
                            conn.commit(); // 进行connection的提交动作
                        }
                    } catch (Exception e) {
                        // nothing
                    } finally {
                        for (Map.Entry<KpiType, PreparedStatement> entry : this.map.entrySet()) {
                            try {
                                entry.getValue().close();
                            } catch (SQLException e) {
                                // nothing
                            }
                        }
                        if (conn != null)
                            try {
                                conn.close();
                            } catch (Exception e) {
                                // nothing
                            }
                    }
                }
            }
    
        }
    }

    项目代码参考:wjy.rar

  • 相关阅读:
    柯西恒等式 FPGA中信号的跨时钟域处理模板(二)
    OSPF
    Windows多网卡路由设置
    使用线程时需要注意的地方
    dicom 影像通信(scu、scp)的c-echo、c-store、c-find、c-move
    关于python3没有numpy和matplotlib库怎么办
    使用centos6.5时的几个小问题
    关于用Apache Beam跑WordCount
    MarkdownPad2的安装、破解和汉化
    安装Photoshop CS64
  • 原文地址:https://www.cnblogs.com/cac2020/p/11156134.html
Copyright © 2011-2022 走看看