环境
hadoop-2.6.5
hbase-0.98.12.1-hadoop2
新增用户指标分析
(1)用户分析模块
(2)浏览器分析模块
根据分析效果图,找出分析的维度:
用户分析是指某个时间段内的数量变化,浏览器用户分析自然就是某个浏览器在某个时间段内的数量变化,那么根据现有条件确定统计分类的种类,举例说明:
用户基本信息模块:新增用户(时间)
浏览器分析模块:新增用户(时间,浏览器信息)
2018-08-10 www.bjsxt.com zhangsan firefox-48
2018-08-10 www.bjsxt.com lisi firefox-53
MR
map:
2018-08-10 zhangsan
2018-08-10,firefox-48 zhangsan
2018-08-10,firefox-all zhangsan
2018-08-10 lisi
2018-08-10,firefox-53 lisi
2018-08-10,firefix-all lisi
reduce:
2018-08-10 zhangsan 2
2018-08-10 lisi
2018-08-10,firefox-48 zhangsan 1
2018-08-10,firefox-53 lisi 1
2018-08-10,firefix-all lisi 2
2018-08-10,firefox-all zhangsan
模块当做其中一个条件
时间,用户基本信息模块
时间,浏览器,浏览器分析模块
2018-08-10 www.bjsxt.com zhangsan firefox-48
2018-08-10 www.bjsxt.com lisi firefox-53
map
2018-08-10,user zhangsan
2018-08-10,firefox-48,browser zhangsan
2018-08-10,firefix-all,browser zhangsan
2018-08-10,user lisi
2018-08-10,firefox-53,browser lisi
2018-08-10,firefix-all,browser lisi
reduce
2018-08-10,user zhangsan 2
2018-08-10,user lisi
2018-08-10,firefox-48,browser zhangsan 1
2018-08-10,firefox-53,browser lisi 1
2018-08-10,firefix-all,browser zhangsan 2
2018-08-10,firefix-all,browser lisi
Runner:
package com.sxt.transformer.mr.nu; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.sxt.common.DateEnum; import com.sxt.common.EventLogConstants; import com.sxt.common.EventLogConstants.EventEnum; import com.sxt.common.GlobalConstants; import com.sxt.transformer.model.dim.StatsUserDimension; import com.sxt.transformer.model.dim.base.DateDimension; import com.sxt.transformer.model.value.map.TimeOutputValue; import com.sxt.transformer.model.value.reduce.MapWritableValue; import com.sxt.transformer.mr.TransformerOutputFormat; import com.sxt.util.JdbcManager; import com.sxt.util.TimeUtil; /** * 计算新增用户入口类 * * @author root * */ public class NewInstallUserRunner implements Tool { private static final Logger logger = Logger.getLogger(NewInstallUserRunner.class); private Configuration conf = new Configuration(); /** * 入口main方法 * * @param args */ public static void main(String[] args) { try { ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args); } catch (Exception e) { logger.error("运行计算新用户的job出现异常", e); throw new RuntimeException(e); } } @Override public void setConf(Configuration conf) { conf.addResource("output-collector.xml"); conf.addResource("query-mapping.xml"); conf.addResource("transformer-env.xml"); conf.set("fs.defaultFS", "hdfs://node101:8020");//HDFS // conf.set("yarn.resourcemanager.hostname", "node3"); conf.set("hbase.zookeeper.quorum", "node104");//HBase this.conf = HBaseConfiguration.create(conf); } @Override public Configuration getConf() { return this.conf; } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); // 处理参数 this.processArgs(conf, args); Job job = Job.getInstance(conf, "new_install_user"); job.setJarByClass(NewInstallUserRunner.class); // 本地运行 TableMapReduceUtil.initTableMapperJob( initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job, false); // 集群运行:本地提交和打包(jar)提交 // TableMapReduceUtil.initTableMapperJob(initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job); job.setReducerClass(NewInstallUserReducer.class); job.setOutputKeyClass(StatsUserDimension.class);//维度作为key job.setOutputValueClass(MapWritableValue.class); // job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TransformerOutputFormat.class);//自定义输出到mysql的outputformat类 if (job.waitForCompletion(true)) { // 执行成功, 需要计算总用户 this.calculateTotalUsers(conf); return 0; } else { return -1; } } /** * 计算总用户 * 查询昨天和今天统计的数据 然后累加并记录最新数据 * @param conf */ private void calculateTotalUsers(Configuration conf) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { long date = TimeUtil.parseString2Long(conf.get(GlobalConstants.RUNNING_DATE_PARAMES)); // 获取今天的date dimension DateDimension todayDimension = DateDimension.buildDate(date, DateEnum.DAY); // 获取昨天的date dimension DateDimension yesterdayDimension = DateDimension.buildDate(date - GlobalConstants.DAY_OF_MILLISECONDS, DateEnum.DAY); int yesterdayDimensionId = -1; int todayDimensionId = -1; // 1. 获取时间id conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT); // 获取执行时间的昨天的 pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?"); int i = 0; pstmt.setInt(++i, yesterdayDimension.getYear()); pstmt.setInt(++i, yesterdayDimension.getSeason()); pstmt.setInt(++i, yesterdayDimension.getMonth()); pstmt.setInt(++i, yesterdayDimension.getWeek()); pstmt.setInt(++i, yesterdayDimension.getDay()); pstmt.setString(++i, yesterdayDimension.getType()); pstmt.setDate(++i, new Date(yesterdayDimension.getCalendar().getTime())); rs = pstmt.executeQuery(); if (rs.next()) { yesterdayDimensionId = rs.getInt(1); } // 获取执行时间当天的id pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?"); i = 0; pstmt.setInt(++i, todayDimension.getYear()); pstmt.setInt(++i, todayDimension.getSeason()); pstmt.setInt(++i, todayDimension.getMonth()); pstmt.setInt(++i, todayDimension.getWeek()); pstmt.setInt(++i, todayDimension.getDay()); pstmt.setString(++i, todayDimension.getType()); pstmt.setDate(++i, new Date(todayDimension.getCalendar().getTime())); rs = pstmt.executeQuery(); if (rs.next()) { todayDimensionId = rs.getInt(1); } // 2.获取昨天的原始数据,存储格式为:platformid = totalusers Map<String, Integer> oldValueMap = new HashMap<String, Integer>(); // 开始更新stats_user if (yesterdayDimensionId > -1) { pstmt = conn.prepareStatement("select `platform_dimension_id`,`total_install_users` from `stats_user` where `date_dimension_id`=?"); pstmt.setInt(1, yesterdayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int totalUsers = rs.getInt("total_install_users"); oldValueMap.put("" + platformId, totalUsers); } } // 添加今天的总用户 pstmt = conn.prepareStatement("select `platform_dimension_id`,`new_install_users` from `stats_user` where `date_dimension_id`=?"); pstmt.setInt(1, todayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int newUsers = rs.getInt("new_install_users"); if (oldValueMap.containsKey("" + platformId)) { newUsers += oldValueMap.get("" + platformId); } oldValueMap.put("" + platformId, newUsers); } // 更新操作 pstmt = conn.prepareStatement("INSERT INTO `stats_user`(`platform_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?"); for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) { pstmt.setInt(1, Integer.valueOf(entry.getKey())); pstmt.setInt(2, todayDimensionId); pstmt.setInt(3, entry.getValue()); pstmt.setInt(4, entry.getValue()); pstmt.execute(); } // 开始更新stats_device_browser oldValueMap.clear(); if (yesterdayDimensionId > -1) { pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`total_install_users` from `stats_device_browser` where `date_dimension_id`=?"); pstmt.setInt(1, yesterdayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int browserId = rs.getInt("browser_dimension_id"); int totalUsers = rs.getInt("total_install_users"); oldValueMap.put(platformId + "_" + browserId, totalUsers); } } // 添加今天的总用户 pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`new_install_users` from `stats_device_browser` where `date_dimension_id`=?"); pstmt.setInt(1, todayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int browserId = rs.getInt("browser_dimension_id"); int newUsers = rs.getInt("new_install_users"); String key = platformId + "_" + browserId; if (oldValueMap.containsKey(key)) { newUsers += oldValueMap.get(key); } oldValueMap.put(key, newUsers); } // 更新操作 pstmt = conn.prepareStatement("INSERT INTO `stats_device_browser`(`platform_dimension_id`,`browser_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?"); for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) { String[] key = entry.getKey().split("_"); pstmt.setInt(1, Integer.valueOf(key[0])); pstmt.setInt(2, Integer.valueOf(key[1])); pstmt.setInt(3, todayDimensionId); pstmt.setInt(4, entry.getValue()); pstmt.setInt(5, entry.getValue()); pstmt.execute(); } } catch (SQLException e) { e.printStackTrace(); } } /** * 处理参数 * * @param conf * @param args */ private void processArgs(Configuration conf, String[] args) { String date = null; for (int i = 0; i < args.length; i++) { if ("-d".equals(args[i])) { if (i + 1 < args.length) { date = args[++i]; break; } } } // 要求date格式为: yyyy-MM-dd if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) { // date是一个无效时间数据 date = TimeUtil.getYesterday(); // 默认时间是昨天 } System.out.println("----------------------" + date); conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date); } /** * 初始化scan集合 * * @param job * @return */ private List<Scan> initScans(Job job) { // 时间戳+.... Configuration conf = job.getConfiguration(); // 获取运行时间: yyyy-MM-dd String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); long startDate = TimeUtil.parseString2Long(date); long endDate = startDate + GlobalConstants.DAY_OF_MILLISECONDS; Scan scan = new Scan(); // 定义hbase扫描的开始rowkey和结束rowkey scan.setStartRow(Bytes.toBytes("" + startDate)); scan.setStopRow(Bytes.toBytes("" + endDate)); FilterList filterList = new FilterList(); // 过滤数据,只分析launch事件 filterList.addFilter(new SingleColumnValueFilter( Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME), Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.LAUNCH.alias))); // 定义mapper中需要获取的列名 String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, EventLogConstants.LOG_COLUMN_NAME_UUID, EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, EventLogConstants.LOG_COLUMN_NAME_PLATFORM, EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION }; // scan.addColumn(family, qualifier) filterList.addFilter(this.getColumnFilter(columns)); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(EventLogConstants.HBASE_NAME_EVENT_LOGS)); scan.setFilter(filterList); return Lists.newArrayList(scan); } /** * 获取这个列名过滤的column * * @param columns * @return */ private Filter getColumnFilter(String[] columns) { int length = columns.length; byte[][] filter = new byte[length][]; for (int i = 0; i < length; i++) { filter[i] = Bytes.toBytes(columns[i]); } return new MultipleColumnPrefixFilter(filter); } }
Mapper:
package com.sxt.transformer.mr.nu; import java.io.IOException; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import com.sxt.common.DateEnum; import com.sxt.common.EventLogConstants; import com.sxt.common.KpiType; import com.sxt.transformer.model.dim.StatsCommonDimension; import com.sxt.transformer.model.dim.StatsUserDimension; import com.sxt.transformer.model.dim.base.BrowserDimension; import com.sxt.transformer.model.dim.base.DateDimension; import com.sxt.transformer.model.dim.base.KpiDimension; import com.sxt.transformer.model.dim.base.PlatformDimension; import com.sxt.transformer.model.value.map.TimeOutputValue; /** * 自定义的计算新用户的mapper类 * * @author root * */ public class NewInstallUserMapper extends TableMapper<StatsUserDimension, TimeOutputValue> { //每个分析条件(由各个维度组成的)作为key,uuid作为value private static final Logger logger = Logger.getLogger(NewInstallUserMapper.class); private StatsUserDimension statsUserDimension = new StatsUserDimension(); private TimeOutputValue timeOutputValue = new TimeOutputValue(); private byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME); //代表用户分析模块的统计 private KpiDimension newInstallUserKpi = new KpiDimension(KpiType.NEW_INSTALL_USER.name); //浏览器分析模块的统计 private KpiDimension newInstallUserOfBrowserKpi = new KpiDimension(KpiType.BROWSER_NEW_INSTALL_USER.name); /** * map 读取hbase中的数据,输入数据为:hbase表中每一行。 * 输出key类型:StatsUserDimension * value类型:TimeOutputValue */ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String uuid = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID))); String serverTime = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))); String platform = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM))); System.out.println(uuid + "-" + serverTime + "-" + platform); if (StringUtils.isBlank(uuid) || StringUtils.isBlank(serverTime) || StringUtils.isBlank(platform)) { logger.warn("uuid&servertime&platform不能为空"); return; } long longOfTime = Long.valueOf(serverTime.trim()); timeOutputValue.setId(uuid); // 设置id为uuid timeOutputValue.setTime(longOfTime); // 设置时间为服务器时间 // 设置date维度 DateDimension dateDimension = DateDimension.buildDate(longOfTime, DateEnum.DAY); StatsCommonDimension statsCommonDimension = this.statsUserDimension.getStatsCommon(); statsCommonDimension.setDate(dateDimension); List<PlatformDimension> platformDimensions = PlatformDimension.buildList(platform); // browser相关的数据 String browserName = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME))); String browserVersion = Bytes.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION))); List<BrowserDimension> browserDimensions = BrowserDimension.buildList(browserName, browserVersion); //空浏览器维度,不考虑浏览器维度 BrowserDimension defaultBrowser = new BrowserDimension("", ""); for (PlatformDimension pf : platformDimensions) { //用户分析 statsUserDimension.setBrowser(defaultBrowser); statsCommonDimension.setKpi(newInstallUserKpi); statsCommonDimension.setPlatform(pf); context.write(statsUserDimension, timeOutputValue); //浏览器用户分析 for (BrowserDimension br : browserDimensions) { statsCommonDimension.setKpi(newInstallUserOfBrowserKpi); statsUserDimension.setBrowser(br); context.write(statsUserDimension, timeOutputValue); } } } }
Reducer:
package com.sxt.transformer.mr.nu; import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.mapreduce.Reducer; import com.sxt.common.KpiType; import com.sxt.transformer.model.dim.StatsUserDimension; import com.sxt.transformer.model.value.map.TimeOutputValue; import com.sxt.transformer.model.value.reduce.MapWritableValue; /** * 计算new isntall user的reduce类 * * @author root * */ public class NewInstallUserReducer extends Reducer<StatsUserDimension, TimeOutputValue, StatsUserDimension, MapWritableValue> { private MapWritableValue outputValue = new MapWritableValue(); private Set<String> unique = new HashSet<String>(); @Override protected void reduce(StatsUserDimension key, Iterable<TimeOutputValue> values, Context context) throws IOException, InterruptedException { this.unique.clear(); // 开始计算uuid的个数 for (TimeOutputValue value : values) { this.unique.add(value.getId());//uid,用户ID } MapWritable map = new MapWritable();//相当于java中HashMap map.put(new IntWritable(-1), new IntWritable(this.unique.size())); outputValue.setValue(map); // 设置kpi名称-模块名 告诉数据库插入哪张表中 String kpiName = key.getStatsCommon().getKpi().getKpiName(); if (KpiType.NEW_INSTALL_USER.name.equals(kpiName)) { // 计算stats_user表中的新增用户 outputValue.setKpi(KpiType.NEW_INSTALL_USER); } else if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(kpiName)) { // 计算stats_device_browser表中的新增用户 outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER); } context.write(key, outputValue); } }
统计维度StatsUserDimension:
package com.sxt.transformer.mr.nu; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.sxt.common.DateEnum; import com.sxt.common.EventLogConstants; import com.sxt.common.EventLogConstants.EventEnum; import com.sxt.common.GlobalConstants; import com.sxt.transformer.model.dim.StatsUserDimension; import com.sxt.transformer.model.dim.base.DateDimension; import com.sxt.transformer.model.value.map.TimeOutputValue; import com.sxt.transformer.model.value.reduce.MapWritableValue; import com.sxt.transformer.mr.TransformerOutputFormat; import com.sxt.util.JdbcManager; import com.sxt.util.TimeUtil; /** * 计算新增用户入口类 * * @author root * */ public class NewInstallUserRunner implements Tool { private static final Logger logger = Logger.getLogger(NewInstallUserRunner.class); private Configuration conf = new Configuration(); /** * 入口main方法 * * @param args */ public static void main(String[] args) { try { ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args); } catch (Exception e) { logger.error("运行计算新用户的job出现异常", e); throw new RuntimeException(e); } } @Override public void setConf(Configuration conf) { conf.addResource("output-collector.xml"); conf.addResource("query-mapping.xml"); conf.addResource("transformer-env.xml"); conf.set("fs.defaultFS", "hdfs://node101:8020");//HDFS // conf.set("yarn.resourcemanager.hostname", "node3"); conf.set("hbase.zookeeper.quorum", "node104");//HBase this.conf = HBaseConfiguration.create(conf); } @Override public Configuration getConf() { return this.conf; } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); // 处理参数 this.processArgs(conf, args); Job job = Job.getInstance(conf, "new_install_user"); job.setJarByClass(NewInstallUserRunner.class); // 本地运行 TableMapReduceUtil.initTableMapperJob( initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job, false); // 集群运行:本地提交和打包(jar)提交 // TableMapReduceUtil.initTableMapperJob(initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job); job.setReducerClass(NewInstallUserReducer.class); job.setOutputKeyClass(StatsUserDimension.class);//维度作为key job.setOutputValueClass(MapWritableValue.class); // job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TransformerOutputFormat.class);//自定义输出到mysql的outputformat类 if (job.waitForCompletion(true)) { // 执行成功, 需要计算总用户 this.calculateTotalUsers(conf); return 0; } else { return -1; } } /** * 计算总用户 * 查询昨天和今天统计的数据 然后累加并记录最新数据 * @param conf */ private void calculateTotalUsers(Configuration conf) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { long date = TimeUtil.parseString2Long(conf.get(GlobalConstants.RUNNING_DATE_PARAMES)); // 获取今天的date dimension DateDimension todayDimension = DateDimension.buildDate(date, DateEnum.DAY); // 获取昨天的date dimension DateDimension yesterdayDimension = DateDimension.buildDate(date - GlobalConstants.DAY_OF_MILLISECONDS, DateEnum.DAY); int yesterdayDimensionId = -1; int todayDimensionId = -1; // 1. 获取时间id conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT); // 获取执行时间的昨天的 pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?"); int i = 0; pstmt.setInt(++i, yesterdayDimension.getYear()); pstmt.setInt(++i, yesterdayDimension.getSeason()); pstmt.setInt(++i, yesterdayDimension.getMonth()); pstmt.setInt(++i, yesterdayDimension.getWeek()); pstmt.setInt(++i, yesterdayDimension.getDay()); pstmt.setString(++i, yesterdayDimension.getType()); pstmt.setDate(++i, new Date(yesterdayDimension.getCalendar().getTime())); rs = pstmt.executeQuery(); if (rs.next()) { yesterdayDimensionId = rs.getInt(1); } // 获取执行时间当天的id pstmt = conn.prepareStatement("SELECT `id` FROM `dimension_date` WHERE `year` = ? AND `season` = ? AND `month` = ? AND `week` = ? AND `day` = ? AND `type` = ? AND `calendar` = ?"); i = 0; pstmt.setInt(++i, todayDimension.getYear()); pstmt.setInt(++i, todayDimension.getSeason()); pstmt.setInt(++i, todayDimension.getMonth()); pstmt.setInt(++i, todayDimension.getWeek()); pstmt.setInt(++i, todayDimension.getDay()); pstmt.setString(++i, todayDimension.getType()); pstmt.setDate(++i, new Date(todayDimension.getCalendar().getTime())); rs = pstmt.executeQuery(); if (rs.next()) { todayDimensionId = rs.getInt(1); } // 2.获取昨天的原始数据,存储格式为:platformid = totalusers Map<String, Integer> oldValueMap = new HashMap<String, Integer>(); // 开始更新stats_user if (yesterdayDimensionId > -1) { pstmt = conn.prepareStatement("select `platform_dimension_id`,`total_install_users` from `stats_user` where `date_dimension_id`=?"); pstmt.setInt(1, yesterdayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int totalUsers = rs.getInt("total_install_users"); oldValueMap.put("" + platformId, totalUsers); } } // 添加今天的总用户 pstmt = conn.prepareStatement("select `platform_dimension_id`,`new_install_users` from `stats_user` where `date_dimension_id`=?"); pstmt.setInt(1, todayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int newUsers = rs.getInt("new_install_users"); if (oldValueMap.containsKey("" + platformId)) { newUsers += oldValueMap.get("" + platformId); } oldValueMap.put("" + platformId, newUsers); } // 更新操作 pstmt = conn.prepareStatement("INSERT INTO `stats_user`(`platform_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?"); for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) { pstmt.setInt(1, Integer.valueOf(entry.getKey())); pstmt.setInt(2, todayDimensionId); pstmt.setInt(3, entry.getValue()); pstmt.setInt(4, entry.getValue()); pstmt.execute(); } // 开始更新stats_device_browser oldValueMap.clear(); if (yesterdayDimensionId > -1) { pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`total_install_users` from `stats_device_browser` where `date_dimension_id`=?"); pstmt.setInt(1, yesterdayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int browserId = rs.getInt("browser_dimension_id"); int totalUsers = rs.getInt("total_install_users"); oldValueMap.put(platformId + "_" + browserId, totalUsers); } } // 添加今天的总用户 pstmt = conn.prepareStatement("select `platform_dimension_id`,`browser_dimension_id`,`new_install_users` from `stats_device_browser` where `date_dimension_id`=?"); pstmt.setInt(1, todayDimensionId); rs = pstmt.executeQuery(); while (rs.next()) { int platformId = rs.getInt("platform_dimension_id"); int browserId = rs.getInt("browser_dimension_id"); int newUsers = rs.getInt("new_install_users"); String key = platformId + "_" + browserId; if (oldValueMap.containsKey(key)) { newUsers += oldValueMap.get(key); } oldValueMap.put(key, newUsers); } // 更新操作 pstmt = conn.prepareStatement("INSERT INTO `stats_device_browser`(`platform_dimension_id`,`browser_dimension_id`,`date_dimension_id`,`total_install_users`) VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE `total_install_users` = ?"); for (Map.Entry<String, Integer> entry : oldValueMap.entrySet()) { String[] key = entry.getKey().split("_"); pstmt.setInt(1, Integer.valueOf(key[0])); pstmt.setInt(2, Integer.valueOf(key[1])); pstmt.setInt(3, todayDimensionId); pstmt.setInt(4, entry.getValue()); pstmt.setInt(5, entry.getValue()); pstmt.execute(); } } catch (SQLException e) { e.printStackTrace(); } } /** * 处理参数 * * @param conf * @param args */ private void processArgs(Configuration conf, String[] args) { String date = null; for (int i = 0; i < args.length; i++) { if ("-d".equals(args[i])) { if (i + 1 < args.length) { date = args[++i]; break; } } } // 要求date格式为: yyyy-MM-dd if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) { // date是一个无效时间数据 date = TimeUtil.getYesterday(); // 默认时间是昨天 } System.out.println("----------------------" + date); conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date); } /** * 初始化scan集合 * * @param job * @return */ private List<Scan> initScans(Job job) { // 时间戳+.... Configuration conf = job.getConfiguration(); // 获取运行时间: yyyy-MM-dd String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); long startDate = TimeUtil.parseString2Long(date); long endDate = startDate + GlobalConstants.DAY_OF_MILLISECONDS; Scan scan = new Scan(); // 定义hbase扫描的开始rowkey和结束rowkey scan.setStartRow(Bytes.toBytes("" + startDate)); scan.setStopRow(Bytes.toBytes("" + endDate)); FilterList filterList = new FilterList(); // 过滤数据,只分析launch事件 filterList.addFilter(new SingleColumnValueFilter( Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME), Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.LAUNCH.alias))); // 定义mapper中需要获取的列名 String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, EventLogConstants.LOG_COLUMN_NAME_UUID, EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, EventLogConstants.LOG_COLUMN_NAME_PLATFORM, EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION }; // scan.addColumn(family, qualifier) filterList.addFilter(this.getColumnFilter(columns)); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(EventLogConstants.HBASE_NAME_EVENT_LOGS)); scan.setFilter(filterList); return Lists.newArrayList(scan); } /** * 获取这个列名过滤的column * * @param columns * @return */ private Filter getColumnFilter(String[] columns) { int length = columns.length; byte[][] filter = new byte[length][]; for (int i = 0; i < length; i++) { filter[i] = Bytes.toBytes(columns[i]); } return new MultipleColumnPrefixFilter(filter); } }
输出到mysql的outputformat类:
package com.sxt.transformer.mr; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import com.sxt.common.GlobalConstants; import com.sxt.common.KpiType; import com.sxt.transformer.model.dim.base.BaseDimension; import com.sxt.transformer.model.value.BaseStatsValueWritable; import com.sxt.transformer.service.IDimensionConverter; import com.sxt.transformer.service.impl.DimensionConverterImpl; import com.sxt.util.JdbcManager; /** * 自定义输出到mysql的outputformat类 * BaseDimension:reducer输出的key * BaseStatsValueWritable:reducer输出的value * @author root * */ public class TransformerOutputFormat extends OutputFormat<BaseDimension, BaseStatsValueWritable> { private static final Logger logger = Logger.getLogger(TransformerOutputFormat.class); /** * 定义每条数据的输出格式,一条数据就是reducer任务每次执行write方法输出的数据。 */ @Override public RecordWriter<BaseDimension, BaseStatsValueWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Connection conn = null; IDimensionConverter converter = new DimensionConverterImpl(); try { conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT); conn.setAutoCommit(false); } catch (SQLException e) { logger.error("获取数据库连接失败", e); throw new IOException("获取数据库连接失败", e); } return new TransformerRecordWriter(conn, conf, converter); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // 检测输出空间,输出到mysql不用检测 } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context); } /** * 自定义具体数据输出writer * * @author root * */ public class TransformerRecordWriter extends RecordWriter<BaseDimension, BaseStatsValueWritable> { private Connection conn = null; private Configuration conf = null; private IDimensionConverter converter = null; private Map<KpiType, PreparedStatement> map = new HashMap<KpiType, PreparedStatement>(); private Map<KpiType, Integer> batch = new HashMap<KpiType, Integer>(); public TransformerRecordWriter(Connection conn, Configuration conf, IDimensionConverter converter) { super(); this.conn = conn; this.conf = conf; this.converter = converter; } @Override /** * 当reduce任务输出数据是,由计算框架自动调用。把reducer输出的数据写到mysql中 */ public void write(BaseDimension key, BaseStatsValueWritable value) throws IOException, InterruptedException { if (key == null || value == null) { return; } try { KpiType kpi = value.getKpi(); PreparedStatement pstmt = null;//每一个pstmt对象对应一个sql语句 int count = 1;//sql语句的批处理,一次执行10 if (map.get(kpi) == null) { // 使用kpi进行区分,返回sql保存到config中 pstmt = this.conn.prepareStatement(conf.get(kpi.name)); map.put(kpi, pstmt); } else { pstmt = map.get(kpi); count = batch.get(kpi); count++; } batch.put(kpi, count); // 批量次数的存储 String collectorName = conf.get(GlobalConstants.OUTPUT_COLLECTOR_KEY_PREFIX + kpi.name); Class<?> clazz = Class.forName(collectorName); IOutputCollector collector = (IOutputCollector) clazz.newInstance();//把value插入到mysql的方法。由于kpi维度不一样。插入到不能表里面。 collector.collect(conf, key, value, pstmt, converter); if (count % Integer.valueOf(conf.get(GlobalConstants.JDBC_BATCH_NUMBER, GlobalConstants.DEFAULT_JDBC_BATCH_NUMBER)) == 0) { pstmt.executeBatch(); conn.commit(); batch.put(kpi, 0); // 对应批量计算删除 } } catch (Throwable e) { logger.error("在writer中写数据出现异常", e); throw new IOException(e); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { try { for (Map.Entry<KpiType, PreparedStatement> entry : this.map.entrySet()) { entry.getValue().executeBatch(); } } catch (SQLException e) { logger.error("执行executeUpdate方法异常", e); throw new IOException(e); } finally { try { if (conn != null) { conn.commit(); // 进行connection的提交动作 } } catch (Exception e) { // nothing } finally { for (Map.Entry<KpiType, PreparedStatement> entry : this.map.entrySet()) { try { entry.getValue().close(); } catch (SQLException e) { // nothing } } if (conn != null) try { conn.close(); } catch (Exception e) { // nothing } } } } } }
项目代码参考:wjy.rar