- ETL介绍和实现
1.ETL的介绍:
在上面的案例中我们的操作虽然完成,但是我们发现一个问题,都是对一个166mb大小的文件操作,效率是非常低下的,ETL是用来描述:将数据从来源段经过抽取(extract) 转换(transform) 加载(load) 到目的端的过程。
2.ETL的案例
针对日志文件,截取我们需要的字段,去除不必要的字段
public static class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //读取一行日志 String log = value.toString(); //通过LogParser解析获取这些字段 Map<String, String> map = new LogParser().parse(log); String ip = map.get("ip"); String url = map.get("url"); String sessionId = map.get("sessionId"); String time = map.get("time"); //通过IPParser获取国家省市 RegionInfo analyseIp = IPParser.getInstance().analyseIp(ip); String counttry = analyseIp.getCountry() == null ? "-" : analyseIp.getCountry(); String province = analyseIp.getProvince() == null ? "-" : analyseIp.getProvince(); String city = analyseIp.getCity() == null ? "-" : analyseIp.getCity(); //Buffer StringBuffer sb = new StringBuffer(); sb.append(ip +" "); sb.append(url +" "); sb.append(sessionId +" "); sb.append(time +" "); sb.append(counttry +" "); sb.append(province +" "); sb.append(city); //写入上下文 context.write(new Text(sb.toString()), NullWritable.get()); } }
提交主类:
public static void main(String[] args) throws Exception { // 加载配置文件 Configuration conf = new Configuration(); //创建hdfs对象 FileSystem fs = FileSystem.get(conf); //判断输出路径是否重复 if(fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]),true); } // 创建Job对象 Job job = Job.getInstance(conf); // 设置提交主类 job.setJarByClass(ETLApp.class); // 设置Mapper类相关的参数 job.setMapperClass(ETLMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 设置输入路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务 job.waitForCompletion(true); }