在eclipse软件里创建一个maven项目
jdk要换成本地安装的1.8版本的
加载pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.it19gong</groupId> <artifactId>clickLog</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>clickLog</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>E:/software/jdk1.8/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.33</version> </dependency> </dependencies> </project>
在加载依赖包的时候如果出现错误,在仓库里找不到1.8jdk.tools
在这个地方改成本地的jdk绝对路径,再重新加载一次maven的依赖包
我这里修改成
在项目下新建AccessLogPreProcessMapper类
package com.it19gong.clickLog; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text text = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String itr[] = value.toString().split(" "); if (itr.length < 11) { return; } String ip = itr[0]; String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]); String url = itr[6]; String upFlow = itr[9]; text.set(ip+","+date+","+url+","+upFlow); context.write(text, NullWritable.get()); } }
创建AnalysisNginxTool类
package com.it19gong.clickLog; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AnalysisNginxTool { private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class); public static String nginxDateStmpToDate(String date) { String res = ""; try { SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss"); String datetmp = date.split(" ")[0].toUpperCase(); String mtmp = datetmp.split("/")[1]; DateToNUM.initMap(); datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp)); System.out.println(datetmp); Date d = df.parse(datetmp); SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); res = sdf.format(d); } catch (ParseException e) { logger.error("error:" + date, e); } return res; } public static long nginxDateStmpToDateTime(String date) { long l = 0; try { SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss"); String datetmp = date.split(" ")[0].toUpperCase(); String mtmp = datetmp.split("/")[1]; datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp)); Date d = df.parse(datetmp); l = d.getTime(); } catch (ParseException e) { logger.error("error:" + date, e); } return l; } }
创建DateToNUM类
package com.it19gong.clickLog; import java.util.HashMap; public class DateToNUM { public static HashMap map = new HashMap(); public static void initMap() { map.put("JAN", "01"); map.put("FEB", "02"); map.put("MAR", "03"); map.put("APR", "04"); map.put("MAY", "05"); map.put("JUN", "06"); map.put("JUL", "07"); map.put("AUG", "08"); map.put("SEPT", "09"); map.put("OCT", "10"); map.put("NOV", "11"); map.put("DEC", "12"); } }
新建AccessLogDriver类
package com.it19gong.clickLog; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class AccessLogDriver { public static void main(String[] args) throws Exception { DateToNUM.initMap(); Configuration conf = new Configuration(); if(args.length != 2){ args = new String[2]; args[0] = "hdfs://node1/data/clickLog/20190620/"; args[1] = "hdfs://node1/uvout/hive" ; } Job job = Job.getInstance(conf); // 设置一个用户定义的job名称 job.setJarByClass(AccessLogDriver.class); job.setMapperClass(AccessLogPreProcessMapper.class); // 为job设置Mapper类 // 为job设置Reducer类 job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class);// 为job的输出数据设置Key类 job.setMapOutputValueClass(NullWritable.class);// 为job输出设置value类 FileInputFormat.addInputPath(job, new Path(args[0])); // 为job设置输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 为job设置输出路径 System.exit(job.waitForCompletion(true) ? 0 : 1); // 运行job } }
把工程打包成Jar包
把jar包上传到集群
在集群上运行一下,先检查一下集群的启动进程
hadoop jar mrclick.jar com.it19gong.clickLog.AccessLogDriver
可以看到输出目录
查看清洗后的数据