操作代码(提前启动集群(start-all.sh)、zookeeper(zkServer.sh start)、启动历史任务服务器(mr-jobhistory-daemon.sh start historyserver)、hbase(start-hbase.sh start))
然后在hbase中创建表
create 'eventlog','log';
AnalyserLogDataRunner类
下边内容有可能会报错,添加如下两句
configuration.set("hbase.master", "master:60000");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
获取输入路径,下面这样设置也可以,表现形式不同而已
AnalyserLogDataMapper类
}
上述生成的主键是很长的,经过crc32使得他们不至于那么长
package com.yjsj.etl.mr; import com.yjsj.common.EventLogConstants; import com.yjsj.common.GlobalConstants; import com.yjsj.util.TimeUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import java.util.logging.Logger; import org.apache.log4j.Logger; import java.io.IOException; public class AnalyserLogDataRunner implements Tool { //public static final Logger log=Logger.getGlobal(); public static final Logger log=Logger.getLogger(AnalyserLogDataRunner.class); //注意这次用的是log4j的日志 private Configuration conf=null; public static void main(String[] args) { try { ToolRunner.run(new Configuration(),new AnalyserLogDataRunner(),args); } catch (Exception e) { log.error("执行日志解析job异常",e); throw new RuntimeException(e); } } @Override public Configuration getConf() { return this.conf; } @Override public void setConf(Configuration configuration) { configuration.set("hbase.zookeeper.quorum", "master,node1,node2"); configuration.set("fs.defaultFS","hdfs://master:9000"); configuration.set("hbase.master", "master:60000"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); this.conf=HBaseConfiguration.create(configuration); } @Override public int run(String[] args) throws Exception { Configuration conf=this.getConf(); this.processArgs(conf,args); Job job=Job.getInstance(conf,"analyser_logdata"); //设置本地提交job,集群运行,需要代码 //File jarFile=EJob.createTempJar("target/classes"); //((JobCong) job.getConfiguration()).setJar(jarFile.toString()); //设置本地提交,集群运行,需要代码结束 job.setJarByClass(AnalyserLogDataRunner.class); job.setMapperClass(AnalyserLogDataMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Put.class); //设置reduce配置 //1.集群上运行,打成jar运行(要求addDependencyJars参数为true,默认为true) //TableMapReduceUtil.initTableReduceJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,null,job); //2、本地运行,要求参数为addDependencyJars为false TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,null,job,null,null,null,null,false); job.setNumReduceTasks(0);//上面红色是表名,封装的名为eventlog的值 this.setJobInputPaths(job); return job.waitForCompletion(true)?0:-1; } private void setJobInputPaths(Job job){ Configuration conf=job.getConfiguration(); FileSystem fs=null; try { fs=FileSystem.get(conf); String date=conf.get(GlobalConstants.RUNNING_DATE_PARAMES); Path inputPath=new Path("/project/log/"+TimeUtil.parseLong2String( TimeUtil.parseString2Long(date),"yyyyMMdd" )+"/"); if (fs.exists(inputPath)){ FileInputFormat.addInputPath(job,inputPath); }else { throw new RuntimeException("文件不存在:"+inputPath); } System.out.println("*******"+inputPath.toString()); } catch (IOException e) { throw new RuntimeException("设置job的mapreduce输入路径出现异常",e); }finally { if (fs!=null){ try { fs.close(); } catch (IOException e) { //e.printStackTrace(); } } } } private void processArgs(Configuration conf,String[] args){ String date=null; for (int i=0;i<args.length;i++){ if("-d".equals(args[i])){ if (i+1<args.length){ date=args[++i]; break; } } } System.out.println("------"+date); //要求格式为yyyy-MM-dd //注意下面是org.apache.commons.lang包下面的 if (StringUtils.isBlank(date)||!TimeUtil.isValidateRunningDate(date)){ //date是一个无效数据 date=TimeUtil.getYesterday(); System.out.println(date); } conf.set(GlobalConstants.RUNNING_DATE_PARAMES,date); } }
package com.yjsj.etl.mr; import com.yjsj.common.EventLogConstants; import com.yjsj.common.GlobalConstants; import com.yjsj.etl.util.LoggerUtil; import com.yjsj.util.TimeUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Map; import java.util.zip.CRC32; public class AnalyserLogDataMapper extends Mapper<LongWritable,Text,NullWritable,Put> { private final Logger logger=Logger.getLogger(AnalyserLogDataMapper.class); private int inputRecords,filterRecords,outputRecords;//用于标志,方便查看过滤数据 private byte[] family=Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME); private CRC32 crc32=new CRC32(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { this.inputRecords++; this.logger.debug("Analyse data of:"+value); try { //解析日志 Map<String,String> clientInfo=LoggerUtil.handleLog(value.toString()); //过滤解析失败的日志 if (clientInfo.isEmpty()){ this.filterRecords++; return; } String eventAliasName =clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME); EventLogConstants.EventEnum event= EventLogConstants.EventEnum.valueOfAlias(eventAliasName); switch (event){ case LAUNCH: case PAGEVIEW: case CHARGEREQUEST: case CHARGEREFUND: case CHARGESUCCESS: case EVENT: //处理数据 this.handleData(clientInfo,event,context); break; default: this.filterRecords++; this.logger.warn("该事件无法解析,事件名称为"+eventAliasName); } } catch (Exception e) { this.filterRecords++; this.logger.error("处理数据发出异常,数据为"+value,e); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); logger.info("输入数据:"+this.inputRecords+"输出数据"+this.outputRecords+"过滤数据"+this.filterRecords); } private void handleData(Map<String,String> clientInfo, EventLogConstants.EventEnum event,Context context) throws IOException,InterruptedException{ String uuid=clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID); String memberId=clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID); String serverTime=clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME); if (StringUtils.isNotBlank(serverTime)){ //要求服务器时间不为空 clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT);//去掉浏览器信息 String rowkey=this.generateRowKey(uuid,memberId,event.alias,serverTime);//timestamp Put put=new Put(Bytes.toBytes(rowkey)); for (Map.Entry<String,String> entry:clientInfo.entrySet()){ if (StringUtils.isNotBlank(entry.getKey())&&StringUtils.isNotBlank(entry.getValue())){ put.add(family,Bytes.toBytes(entry.getKey()),Bytes.toBytes(entry.getValue())); } } context.write(NullWritable.get(),put); this.outputRecords++; }else { this.filterRecords++; } } private String generateRowKey(String uuid,String memberId,String eventAliasName,String serverTime){ StringBuilder sb=new StringBuilder(); sb.append(serverTime).append("_"); this.crc32.reset(); if (StringUtils.isNotBlank(uuid)){ this.crc32.update(uuid.getBytes()); } if (StringUtils.isNotBlank(memberId)){ this.crc32.update(memberId.getBytes()); } this.crc32.update(eventAliasName.getBytes()); sb.append(this.crc32.getValue()%100000000L); return sb.toString(); } }