zoukankan      html  css  js  c++  java
  • 电商平台日志分析系统(大数据) 下

    ## 由于走到ETL 环节

     操作ETL: 数据清洗后放入hbase
    
    zkServer.sh start;  start-dfs.sh  ;
    
    [root@node1 ~]# ./shells/start-yarn-ha.sh 
    start-yarn.sh
    ssh root@node3 "$HADOOP_HOME/sbin/yarn-daemon.sh start resourcemanager"
    ssh root@node4 "$HADOOP_HOME/sbin/yarn-daemon.sh start resourcemanager"
    
    start-hbase.sh 

    -------------------创建hbase表
    hbase shell
    
    hbase(main):001:0> create 'eventlog','log'
    
    ## 执行如下程序后,再scan表查看数据已经etl后的数据。
    

      

    ------------------------------ 运行 项目 BIG_DATA_SXT_1 修改如下配置 public class AnalyserLogDataRunner implements Tool { private static final Logger logger = Logger .getLogger(AnalyserLogDataRunner.class); private Configuration conf = null; public static void main(String[] args) { try { ToolRunner.run(new Configuration(), new AnalyserLogDataRunner(), args); } catch (Exception e) { logger.error("执行日志解析job异常", e); throw new RuntimeException(e); } } @Override public void setConf(Configuration conf) { conf.set("fs.defaultFS", "hdfs://node1:8020"); // conf.set("yarn.resourcemanager.hostname", "node3"); conf.set("hbase.zookeeper.quorum", "node2,node3,node4"); this.conf = HBaseConfiguration.create(conf); } ....... }

     

     

    -------------下一节:
    如上数据也不能满足要求,(ip和日期太少)
    需要数据生成类。/BIG_DATA_SXT_1/test/com/sxt/test/TestDataMaker.java
    

     

    模块当做其中一个条件  
    
    时间,用户基本信息模块
    时间,浏览器,浏览器分析模块
    
    2018-08-10   www.bjsxt.com   zhangsan  firefox-48
    2018-08-10   www.bjsxt.com	 lisi	   firefox-53
    
    map
    2018-08-10,user      						zhangsan
    2018-08-10,firefox-48,browser				zhangsan 
    2018-08-10,firefix-all,browser				zhangsan 
    
    2018-08-10,user      						lisi
    2018-08-10,firefox-53,browser				lisi 
    2018-08-10,firefix-all,browser				lisi 
    reduce
    2018-08-10,user      						zhangsan		2
    2018-08-10,user      						lisi
    
    2018-08-10,firefox-48,browser				zhangsan 		1
    
    2018-08-10,firefix-all,browser				zhangsan 		2
    2018-08-10,firefix-all,browser				lisi 
    
    2018-08-10,firefox-53,browser				lisi 			1
    
    ##  指标,指标组合,
    

      

     

    apache kylin
    http://kylin.apache.org
    

      

    运用mapreduce 统计指标之后写入到mysql.
    package com.sxt.transformer.mr.nu1;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import com.sxt.common.DateEnum;
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.KpiType;
    import com.sxt.transformer.model.dim.StatsCommonDimension;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.dim.base.BrowserDimension;
    import com.sxt.transformer.model.dim.base.DateDimension;
    import com.sxt.transformer.model.dim.base.KpiDimension;
    import com.sxt.transformer.model.dim.base.PlatformDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    
    public class NewInstallUserMapper extends TableMapper<StatsUserDimension, TimeOutputValue>{
    	// map输出的value的对象
    	TimeOutputValue timeOutputValue = new TimeOutputValue();
    	// map 输出的key对象
    	StatsUserDimension statsUserDimension = new StatsUserDimension();
    	
    	// 定义列族
    	byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
    	
    	// 定义模块维度
    	// 用户基本信息模块,浏览器分析模块
    	KpiDimension newInstallUser = new KpiDimension(KpiType.NEW_INSTALL_USER.name);
    	KpiDimension newInstallUserOfBrowser = new KpiDimension(KpiType.BROWSER_NEW_INSTALL_USER.name);
    	
    	
    	
    	@Override
    	protected void map(ImmutableBytesWritable key, Result value,Context context)
    			throws IOException, InterruptedException {
    		// 获取数据,时间,浏览器信息,uuid;平台
    		
    		String time = Bytes.toString(CellUtil.cloneValue(value.getColumnLatestCell(family, 
    				Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))));
    		String browserName = Bytes.toString(CellUtil.cloneValue(value.getColumnLatestCell(family,
    				Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME))));
    		String browserVersion = Bytes.toString(CellUtil.cloneValue(value.getColumnLatestCell(family, 
    				Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION))));
    		String platform = Bytes.toString(CellUtil.cloneValue(value.getColumnLatestCell(family,
    				Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM))));
    		
    		String uuid = Bytes.toString(CellUtil.cloneValue(value.getColumnLatestCell(family,
    				Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID))));
    		// 构建时间维度
    		Long timeOfLong = Long.valueOf(time);
    		DateDimension dateDimension = DateDimension.buildDate(timeOfLong, DateEnum.DAY);
    		
    		// 构建浏览器维度
    		List<BrowserDimension> browserDimensions = BrowserDimension.buildList(browserName, browserVersion);
    		
    		// 构建平台维度(javaweb ,php )
    		List<PlatformDimension> platformDimensions = PlatformDimension.buildList(platform);
    		
    		// 给出输出对象添加值
    		timeOutputValue.setId(uuid);
    		timeOutputValue.setTime(timeOfLong);
    		
    		
    		// 构建维度组合
    		// 获取公共维度组合对象
    		StatsCommonDimension statsCommonDimension = statsUserDimension.getStatsCommon();
    		statsCommonDimension.setDate(dateDimension);
    		
    		BrowserDimension defaultBrowserDimension = new BrowserDimension("","");	
    		for (PlatformDimension pf : platformDimensions) {
    			statsCommonDimension.setPlatform(pf);
    			statsCommonDimension.setKpi(newInstallUser);
    			statsUserDimension.setBrowser(defaultBrowserDimension); // 没有浏览器的维度
    			
    			context.write(statsUserDimension, timeOutputValue);
    			for (BrowserDimension br : browserDimensions) { // 有浏览器的维度
    				statsCommonDimension.setKpi(newInstallUserOfBrowser);
    				statsUserDimension.setBrowser(br);
    				context.write(statsUserDimension, timeOutputValue);
    			}
    			
    		}
    	}
    
    }
    
    package com.sxt.transformer.mr.nu1;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.MapWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import com.sxt.common.KpiType;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    import com.sxt.transformer.model.value.reduce.MapWritableValue;
    
    public class NewInstallUserReducer extends Reducer<StatsUserDimension, TimeOutputValue, StatsUserDimension,MapWritableValue> {
    	// 定义reduce输出的value的对象
    	MapWritableValue mapWritableValue =  new MapWritableValue();
    	// 定义去重集合
    	Set<String> unique = new HashSet<String>();
    	
    	@Override
    	protected void reduce(StatsUserDimension key, Iterable<TimeOutputValue> value,
    			Context context)
    			throws IOException, InterruptedException {
    		
    		// 每次进去的reduce的数据都是一组,unique是共享的。每次他set的数据清空
    		this.unique.clear();
    		// 将map输出的集合数据防止在去重set中,完成去重操作
    		for (TimeOutputValue timeOutputValue : value) {
    			this.unique.add(timeOutputValue.getId());
    		}
    		// 给输出的对象赋值
    		// 定义输出值的大小,使用mapwriteable的方式存储数据,mapwritable类似hashMap
    		MapWritable map  = new MapWritable();
    		// -1 的值是任意的,只是根据-1从map结构中取出对应的size大小。
    		map.put(new IntWritable(-1),new IntWritable(this.unique.size()));
    		// 将map输出给value对象
    		mapWritableValue.setValue(map);
    		
    		// 给reduce value指定模块名称,告诉数据库要将数据插入到哪张表中
    		String kpiName = key.getStatsCommon().getKpi().getKpiName();
    		
    		if(KpiType.NEW_INSTALL_USER.name.equals(kpiName)){
    			mapWritableValue.setKpi(KpiType.NEW_INSTALL_USER);
    		}else if(KpiType.BROWSER_NEW_INSTALL_USER.name.equals(kpiName)){
    			mapWritableValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
    		}
    		// 输出结果
    		context.write(key, mapWritableValue);
    	}
    
    }
    
    package com.sxt.transformer.mr.nu1;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.sxt.common.EventLogConstants;
    import com.sxt.common.GlobalConstants;
    import com.sxt.transformer.model.dim.StatsUserDimension;
    import com.sxt.transformer.model.value.map.TimeOutputValue;
    import com.sxt.transformer.model.value.reduce.MapWritableValue;
    import com.sxt.transformer.mr.TransformerOutputFormat;
    import com.sxt.util.TimeUtil;
    
    public class NewInstallUserRunner implements Tool {
    	
    	private Configuration conf;
    	
    	public static void main(String[] args) {
    		try {
    			ToolRunner.run(new Configuration(), new NewInstallUserRunner(), args);
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    	@Override
    	public Configuration getConf() {
    		return this.conf;
    	}
    
    	@Override
    	public void setConf(Configuration conf) {
    		this.conf = conf;
    		conf.set("fs.defaultFS", "hdfs://node1:8020");
    		conf.set("hbase.zookeeper.quorum", "node1,node2,node3");
    		conf.addResource("output-collector.xml");
    		conf.addResource("query-mapping.xml");
    		conf.addResource("transformer-env.xml");
    		this.conf = HBaseConfiguration.create(conf);
    		
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		Configuration conf = this.getConf();
    		
    		this.processArgs(conf,arg0);
    		Job job = Job.getInstance(conf,"new install user");
    		job.setJarByClass(NewInstallUserRunner.class);
    		
    		TableMapReduceUtil.initTableMapperJob(initScans(job), 
    				NewInstallUserMapper.class,StatsUserDimension.class, 
    				TimeOutputValue.class, job, false);
    		job.setReducerClass(NewInstallUserReducer.class);
    		job.setOutputKeyClass(StatsUserDimension.class);
    		job.setOutputValueClass(MapWritableValue.class);
    		job.setOutputFormatClass(TransformerOutputFormat.class);
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    	
    	private void processArgs(Configuration conf2, String[] args) {
    		String date = "";
    		for (int i = 0; i < args.length; i++) {
    			if("-d".equals(args[i])){
    				if(i+1 < args.length){
    					date = args[++i];
    				}
    			}
    		}
    		
    		if(StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)){
    			date = TimeUtil.getYesterday();
    		}
    		
    		conf2.set(GlobalConstants.RUNNING_DATE_PARAMES,date);
    		
    	}
    	
    	private List<Scan> initScans(Job job) {
    		
    		Configuration conf = job.getConfiguration();
    		String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
    		
    		long time = TimeUtil.parseString2Long(date);
    		String startRow = String.valueOf(time);
    		
    		String stopRow = String.valueOf(time = GlobalConstants.DAY_OF_MILLISECONDS);
    		
    		// 取数据
    		Scan scan = new Scan();
    		// 获取指定某天的数据
    		scan.setStartRow(startRow.getBytes());
    		scan.setStopRow(stopRow.getBytes());
    		
    		// 获取时间值为e_l 的数据
    		FilterList lists = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    		
    		SingleColumnValueFilter filter1 = new SingleColumnValueFilter(EventLogConstants.EVENT_LOGS_FAMILY_NAME.getBytes(),
    				EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME.getBytes(), CompareOp.EQUAL,
    				"e_l".getBytes());
    		
    		// 获取部分列
    		// 定义获取的列名
    		String[] columns = {EventLogConstants.LOG_COLUMN_NAME_UUID,
    				EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME,
    				EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION,
    				EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME,
    				EventLogConstants.LOG_COLUMN_NAME_PLATFORM};
    		lists.addFilter(filter1);
    		lists.addFilter(getColumn(columns));
    		
    		scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, EventLogConstants.HBASE_NAME_EVENT_LOGS.getBytes());
    		
    		return Arrays.asList(scan);
    	}
    	private Filter getColumn(String[] columns) {
    		int length = columns.length;
    		byte[][] buffer = new byte[length][];
    		
    		for (int i = 0; i < length; i++) {
    			buffer[i] = columns[i].getBytes();
    		}
    		
    		return new MultipleColumnPrefixFilter(buffer);
    	}
    
    }
    ## 首先trancate eventlog 表中的旧数据。
    hbase(main):004:0> truncate 'eventlog'
    ## 新生成模拟数据。修改日期和账户密码。
    /BIG_DATA_SXT_1/test/com/sxt/test/TestDataMaker.java
    package com.sxt.test;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    import java.util.zip.CRC32;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import com.sxt.common.EventLogConstants;
    import com.sxt.etl.util.IPSeekerExt;
    import com.sxt.etl.util.IPSeekerExt.RegionInfo;
    import com.sxt.etl.util.LoggerUtil;
    
    public class TestDataMaker {
    	
    	// 表名
    	private static String TN = "eventlog";
    	
    	public static void main(String[] args) throws Exception {
    		
    		TestDataMaker tDataMaker = new TestDataMaker();
    		
    		Random r = new Random();
    		
    		Configuration conf = new Configuration();
    		conf.set("hbase.zookeeper.quorum", "node2,node3,node4");
    
    		HBaseAdmin admin = new HBaseAdmin(conf);
    		HTable hTable = new HTable(conf, TN);
    		
    		// 用户标示u_ud  随机生成8位
    		String uuid = String.format("%08d", r.nextInt(99999999));
    		// 会员标示u_mid  随机生成8位
    		String memberId = String.format("%08d", r.nextInt(99999999));
    
    		List<Put> puts = new ArrayList<Put>();
    		for (int i = 0; i < 100; i++) {
    
    			if(i%5==0) {
    				uuid = String.format("%08d", r.nextInt(99999999));
    				memberId = String.format("%08d", r.nextInt(99999999));
    			}
    			if(i%6==0) {
    				uuid = String.format("%08d", r.nextInt(99999999));
    				memberId = String.format("%08d", r.nextInt(99999999));
    			}
    			
    			Date d = tDataMaker.getDate("20190831");
    			
    			String serverTime = ""+d.getTime();
    			
    			Put put = tDataMaker.putMaker(uuid, memberId, serverTime);
    			puts.add(put);
    		}
    		hTable.put(puts);
    	}
    	
    	Random r = new Random();
    	
    	private static IPSeekerExt ipSeekerExt = new IPSeekerExt();
    	
    	/**
    	 * 测试数据
    	 * day 时间  年月日
    	 * lognum 日志条数
    	 */
    	public Put putMaker(String uuid, String memberId, String serverTime) {
    
    		Map<String, Put> map = new HashMap<String, Put>();
    		
    		byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
    		
    		// 解析日志
    		Map<String, String> clientInfo = LoggerUtil.handleLog("......");
    
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, serverTime);
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_UUID, uuid);
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_PLATFORM, "website");
    		
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, EventNames[r.nextInt(EventNames.length)]);
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, SessionIDs[r.nextInt(SessionIDs.length)]);
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL, CurrentURLs[r.nextInt(CurrentURLs.length)]);
    		
    		
    		clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, this.getOsName());
            clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, this.getOsVersion());
            clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, this.getBrowserName());
            clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, this.getBrowserVersion());
            
            String ip = IPs[r.nextInt(IPs.length)];
            RegionInfo info = ipSeekerExt.analyticIp(ip);
            if (info != null) {
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity());
            }
    		
    		String eventName = EventNames[r.nextInt(EventNames.length)];
    		
    		//生成rowkey
    		String rowkey = this.generateRowKey(uuid, memberId, eventName, serverTime);
    		
    		Put put = new Put(Bytes.toBytes(rowkey));
    		for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
    			put.add(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
    		}
    		
    		return put;
    	}
    	
    	private String[] CurrentURLs = new String[]{"http://www.jd.com",
    			"http://www.tmall.com","http://www.sina.com","http://www.weibo.com"};
    	
    	private String[] SessionIDs = new String[]{"1A3B4F83-6357-4A64-8527-F092169746D3",
    			"12344F83-6357-4A64-8527-F09216974234","1A3B4F83-6357-4A64-8527-F092169746D8"};
    
    	private String[] IPs = new String[]{"58.42.245.255","39.67.154.255",
    			"23.13.191.255","14.197.148.38","14.197.149.137","14.197.201.202","14.197.243.254"};
    	
    	private String[] EventNames = new String[]{"e_l","e_pv"};
    	
    	private String[] BrowserNames = new String[]{"FireFox","Chrome","aoyou","360"};
    	
    	/**
    	 * 获取随机的浏览器名称
    	 * @return
    	 */
    	private String getBrowserName() {
    		return BrowserNames[r.nextInt(BrowserNames.length)];
    	}
    
    
    	/**
    	 * 获取随机的浏览器版本信息
    	 * @return
    	 */
    	private String getBrowserVersion() {
    		return (""+r.nextInt(9));
    	}
    
    	/**
    	 * 获取随机的系统版本信息
    	 * @return
    	 */
    	private String getOsVersion() {
    		return (""+r.nextInt(3));
    	}
    
    	private String[] OsNames = new String[]{"window","linux","ios"};
    	/**
    	 * 获取随机的系统信息
    	 * @return
    	 */
    	private String getOsName() {
    		return OsNames[r.nextInt(OsNames.length)];
    	}
    
    	private CRC32 crc32 = new CRC32();
    	
    	/**
    	 * 根据uuid memberid servertime创建rowkey
    	 * @param uuid
    	 * @param memberId
    	 * @param eventAliasName
    	 * @param serverTime
    	 * @return
    	 */
    	private String generateRowKey(String uuid, String memberId, String eventAliasName, String serverTime) {
    		StringBuilder sb = new StringBuilder();
    		sb.append(serverTime).append("_");
    		this.crc32.reset();
    		if (StringUtils.isNotBlank(uuid)) {
    			this.crc32.update(uuid.getBytes());
    		}
    		if (StringUtils.isNotBlank(memberId)) {
    			this.crc32.update(memberId.getBytes());
    		}
    		this.crc32.update(eventAliasName.getBytes());
    		sb.append(this.crc32.getValue() % 100000000L);
    		return sb.toString();
    	}
    
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
    	
    	/**
    	 * 随机生成时间 
    	 * @param str  年月日 20160101
    	 * @return
    	 */
    	public Date getDate(String str) {
    		str = str + String.format("%02d%02d%02d", new Object[]{r.nextInt(24), r.nextInt(60), r.nextInt(60)});
    		Date d = new Date();
    		try {
    			d = sdf.parse(str);
    		} catch (ParseException e) {
    			e.printStackTrace();
    		}
    		return d;
    	}
    }
    
    
    需要修改配置 transformer-env.xml com.sxt.transformer.service.impl.DimensionConverterImpl 
    运行/BIG_DATA_SXT_2/src/com/sxt/transformer/mr/nu1/NewInstallUserRunner.java
     main . run configuraiton :   -d 2019-09-01 . mysql插入数据成功。
    

      17

  • 相关阅读:
    第 5 章 Nova
    第 5 章 Nova
    第 5 章 Nova
    第 5 章 Nova
    第 5 章 Nova
    第 5 章 Nova
    第 5 章 Nova
    vba:提取字符串中间字符
    vba:根据给定单元格搜索目标值
    vba:合并当前目录下所有工作簿的全部工作表
  • 原文地址:https://www.cnblogs.com/xhzd/p/11441912.html
Copyright © 2011-2022 走看看