1.日志样本
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90
2.需求
-》统计PV数
-》统计注册人数
-》统计ip数
-》统计跳出率-》求跳出人数
3.分析
-》数据采集 shell脚本定时上传到hdfs
-》数据清洗 过滤字段 格式化字段
-》数据分析 分区表
-》数据导出 sqoop
-》使用框架 shell脚本 hdfs mapreduce hive sqoop mysql
4.脚本
这个包括自动上传日志,清洗日志,hive建立表,分析处理
1 #!/bin/bash 2 3 #get the yesterday date 4 yesterday=`date -d "-1 day" +"%Y_%m_%d"` 5 6 #define the HADOOP_HOME and HIVE_HOME 7 HADOOP_HOME=/opt/modules/hadoop-2.5.0 8 HIVE_HOME=/opt/modules/hive-0.13.1-bin 9 LOG_DIR=/opt/datas/logs 10 FILE=access_$yesterday.log 11 HDFS_DIR=/log/source/$yesterday 12 13 JAR_PATH=$LOG_DIR/logclean.jar 14 ENTRANCE=org.apache.hadoop.log.project.LogClean 15 OUTPUT_DIR=/log/clean/date=$yesterday 16 17 HIVE_DB=log_case 18 HIVE_TB=use_tb 19 20 SQOOP_HOME=/opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6 21 22 ######################################## 23 # load the data into hdfs # 24 ######################################## 25 26 #show the yesterday date 27 echo "[**yesterday is $yesterday**]" 28 29 #create the hdfs_path 30 $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_DIR >/dev/null 2>&1 31 $HADOOP_HOME/bin/hdfs dfs -rm -r $OUTPUT_DIR >/dev/null 2>&1 32 $HADOOP_HOME/bin/hdfs dfs -mkdir $HDFS_DIR 33 34 #put the date to hdfs 35 $HADOOP_HOME/bin/hdfs dfs -put $LOG_DIR/$FILE $HDFS_DIR 36 echo "[**the file $FILE is put to $HDFS_DIR**]" 37 38 ######################################## 39 # clean the source data # 40 ######################################## 41 42 $HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_DIR $OUTPUT_DIR 43 echo "[**the file $FILE is cleaned**]" 44 45 ######################################## 46 # load the cleaned data to hive # 47 ######################################## 48 49 $HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DB" 50 $HIVE_HOME/bin/hive -e "create external table if not exists $HIVE_DB.$HIVE_TB(ip string,time string,url string) partitioned by (date string) row format delimited fields terminated by ' ' location '/log/clean'" 51 $HIVE_HOME/bin/hive --database $HIVE_DB -e "alter table $HIVE_TB drop partition (date='$yesterday')" 52 $HIVE_HOME/bin/hive --database $HIVE_DB -e "alter table $HIVE_TB add partition (date='$yesterday')" 53 echo "[**add a partition $yesterday to $HIVE_DB.$HIVE_TB**]" 54 55 ######################################## 56 # analysis the date using hive # 57 ######################################## 58 59 ##PV 60 echo "-------------------------pv start---------------------------------------" 61 $HIVE_HOME/bin/hive --database $HIVE_DB -e "CREATE TABLE if not exists pv_tb(pv string) row format delimited fields terminated by ' '" 62 $HIVE_HOME/bin/hive --database $HIVE_DB -e "insert overwrite table pv_tb SELECT COUNT(1) FROM $HIVE_TB WHERE date='$yesterday'" 63 64 echo "-------------------------pv finished------------------------------------" 65 66 ##register 67 echo "-------------------------rg start---------------------------------------" 68 $HIVE_HOME/bin/hive --database $HIVE_DB -e "create table if not exists register_tb(rg string) row format delimited fields terminated by ' '" 69 $HIVE_HOME/bin/hive --database $HIVE_DB -e "insert overwrite table register_tb select count(1) from $HIVE_TB where date='$yesterday' and instr(url,'member.php?mod=register')>0" 70 echo "-------------------------rg finished------------------------------------" 71 72 ##ip 73 echo "-------------------------ip start---------------------------------------" 74 $HIVE_HOME/bin/hive --database $HIVE_DB -e "CREATE TABLE if not exists ip_tb(ip string) row format delimited fields terminated by ' '" 75 $HIVE_HOME/bin/hive --database $HIVE_DB -e "insert overwrite table ip_tb select count(distinct ip) from $HIVE_TB where date='$yesterday'" 76 echo "-------------------------ip finished------------------------------------" 77 78 ##jump 79 echo "-------------------------jp start---------------------------------------" 80 $HIVE_HOME/bin/hive --database $HIVE_DB -e "CREATE TABLE if not exists jump_tb(jp string) row format delimited fields terminated by ' '" 81 $HIVE_HOME/bin/hive --database $HIVE_DB -e "insert overwrite table jump_tb select count(1) from (select count(ip) ip_single from $HIVE_TB where date='$yesterday' group by ip having ip_single=1) jump" 82 echo "-------------------------jp finished------------------------------------" 83 84 ##result 85 echo "**************************create the result table************************" 86 $HIVE_HOME/bin/hive --database $HIVE_DB -e "create table if not exists result(day string,pv string,register string ,ip string ,jump string) row format delimited fields terminated by ' '" 87 $HIVE_HOME/bin/hive --database $HIVE_DB -e "insert overwrite table result select '$yesterday',a.pv,b.rg,c.ip,d.jp from pv_tb a join register_tb b on 1=1 join ip_tb c on 1=1 join jump_tb d on 1=1" 88 89 ##export to mysql 90 $SQOOP_HOME/bin/sqoop --options-file /opt/datas/logs/sqoop.file
5.数据清洗的jar源代码
1 package org.apache.hadoop.log.project; 2 3 import java.net.URI; 4 import java.text.ParseException; 5 import java.text.SimpleDateFormat; 6 import java.util.Date; 7 import java.util.Locale; 8 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.conf.Configured; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.LongWritable; 14 import org.apache.hadoop.io.NullWritable; 15 import org.apache.hadoop.io.Text; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 25 public class LogClean extends Configured implements Tool { 26 27 public static void main(String[] args) { 28 Configuration conf = new Configuration(); 29 try { 30 int res = ToolRunner.run(conf, new LogClean(), args); 31 System.exit(res); 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } 35 } 36 37 public int run(String[] args) throws Exception { 38 Configuration conf = new Configuration(); 39 Job job = Job.getInstance(conf, "logclean"); 40 // 设置为可以打包运行 41 42 job.setJarByClass(LogClean.class); 43 FileInputFormat.setInputPaths(job, args[0]); 44 job.setMapperClass(MyMapper.class); 45 job.setMapOutputKeyClass(LongWritable.class); 46 job.setMapOutputValueClass(Text.class); 47 job.setReducerClass(MyReducer.class); 48 job.setOutputKeyClass(Text.class); 49 job.setOutputValueClass(NullWritable.class); 50 FileOutputFormat.setOutputPath(job, new Path(args[1])); 51 // 清理已存在的输出文件 52 FileSystem fs = FileSystem.get(new URI(args[0]), getConf()); 53 Path outPath = new Path(args[1]); 54 if (fs.exists(outPath)) { 55 fs.delete(outPath, true); 56 } 57 58 boolean success = job.waitForCompletion(true); 59 if(success){ 60 System.out.println("Clean process success!"); 61 } 62 else{ 63 System.out.println("Clean process failed!"); 64 } 65 return 0; 66 } 67 68 static class MyMapper extends 69 Mapper<LongWritable, Text, LongWritable, Text> { 70 LogParser logParser = new LogParser(); 71 Text outputValue = new Text(); 72 73 protected void map( 74 LongWritable key, 75 Text value, 76 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context) 77 throws java.io.IOException, InterruptedException { 78 final String[] parsed = logParser.parse(value.toString()); 79 80 // step1.过滤掉静态资源访问请求 81 if (parsed[2].startsWith("GET /static/") 82 || parsed[2].startsWith("GET /uc_server")) { 83 return; 84 } 85 // step2.过滤掉开头的指定字符串 86 if (parsed[2].startsWith("GET /")) { 87 parsed[2] = parsed[2].substring("GET /".length()); 88 } else if (parsed[2].startsWith("POST /")) { 89 parsed[2] = parsed[2].substring("POST /".length()); 90 } 91 // step3.过滤掉结尾的特定字符串 92 if (parsed[2].endsWith(" HTTP/1.1")) { 93 parsed[2] = parsed[2].substring(0, parsed[2].length() 94 - " HTTP/1.1".length()); 95 } 96 // step4.只写入前三个记录类型项 97 outputValue.set(parsed[0] + " " + parsed[1] + " " + parsed[2]); 98 context.write(key, outputValue); 99 } 100 } 101 102 static class MyReducer extends 103 Reducer<LongWritable, Text, Text, NullWritable> { 104 protected void reduce( 105 LongWritable k2, 106 java.lang.Iterable<Text> v2s, 107 org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context) 108 throws java.io.IOException, InterruptedException { 109 for (Text v2 : v2s) { 110 context.write(v2, NullWritable.get()); 111 } 112 }; 113 } 114 115 /* 116 * 日志解析类 117 */ 118 static class LogParser { 119 public static final SimpleDateFormat FORMAT = new SimpleDateFormat( 120 "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); 121 public static final SimpleDateFormat dateformat1 = new SimpleDateFormat( 122 "yyyyMMddHHmmss"); 123 124 public static void main(String[] args) throws ParseException { 125 final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127"; 126 LogParser parser = new LogParser(); 127 final String[] array = parser.parse(S1); 128 System.out.println("样例数据: " + S1); 129 System.out.format( 130 "解析结果: ip=%s, time=%s, url=%s, status=%s, traffic=%s", 131 array[0], array[1], array[2], array[3], array[4]); 132 } 133 134 /** 135 * 解析英文时间字符串 136 * 137 * @param string 138 * @return 139 * @throws ParseException 140 */ 141 private Date parseDateFormat(String string) { 142 Date parse = null; 143 try { 144 parse = FORMAT.parse(string); 145 } catch (ParseException e) { 146 e.printStackTrace(); 147 } 148 return parse; 149 } 150 151 /** 152 * 解析日志的行记录 153 * 154 * @param line 155 * @return 数组含有5个元素,分别是ip、时间、url、状态、流量 156 */ 157 public String[] parse(String line) { 158 String ip = parseIP(line); 159 String time = parseTime(line); 160 String url = parseURL(line); 161 String status = parseStatus(line); 162 String traffic = parseTraffic(line); 163 164 return new String[] { ip, time, url, status, traffic }; 165 } 166 167 private String parseTraffic(String line) { 168 final String trim = line.substring(line.lastIndexOf(""") + 1) 169 .trim(); 170 String traffic = trim.split(" ")[1]; 171 return traffic; 172 } 173 174 private String parseStatus(String line) { 175 final String trim = line.substring(line.lastIndexOf(""") + 1) 176 .trim(); 177 String status = trim.split(" ")[0]; 178 return status; 179 } 180 181 private String parseURL(String line) { 182 final int first = line.indexOf("""); 183 final int last = line.lastIndexOf("""); 184 String url = line.substring(first + 1, last); 185 return url; 186 } 187 188 private String parseTime(String line) { 189 final int first = line.indexOf("["); 190 final int last = line.indexOf("+0800]"); 191 String time = line.substring(first + 1, last).trim(); 192 Date date = parseDateFormat(time); 193 return dateformat1.format(date); 194 } 195 196 private String parseIP(String line) { 197 String ip = line.split("- -")[0].trim(); 198 return ip; 199 } 200 } 201 }