zoukankan      html  css  js  c++  java
  • MR框架-->ETL

    • ETL介绍和实现

    1.ETL的介绍:

        在上面的案例中我们的操作虽然完成,但是我们发现一个问题,都是对一个166mb大小的文件操作,效率是非常低下的,ETL是用来描述:将数据从来源段经过抽取(extract)  转换(transform) 加载(load)  到目的端的过程。

    2.ETL的案例

        针对日志文件,截取我们需要的字段,去除不必要的字段

    public static class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                //读取一行日志
                String log = value.toString();
                //通过LogParser解析获取这些字段
                Map<String, String> map = new LogParser().parse(log); 
                String ip = map.get("ip");
                String url = map.get("url");
                String sessionId = map.get("sessionId");
                String  time = map.get("time");
                //通过IPParser获取国家省市
                RegionInfo analyseIp = IPParser.getInstance().analyseIp(ip);
                
                String counttry = analyseIp.getCountry() == null ? "-" : analyseIp.getCountry();
                String province = analyseIp.getProvince() == null ? "-" : analyseIp.getProvince();
                String city   =      analyseIp.getCity() == null ? "-" : analyseIp.getCity();
                //Buffer
                StringBuffer sb = new StringBuffer();
                sb.append(ip +"	");
                sb.append(url +"	");
                sb.append(sessionId +"	");
                sb.append(time +"	");
                sb.append(counttry +"	");
                sb.append(province +"	");
                sb.append(city);
                
                //写入上下文
                context.write(new Text(sb.toString()), NullWritable.get());
            }
        }

    提交主类:

    public static void main(String[] args) throws Exception {
            // 加载配置文件
            Configuration conf = new Configuration();
            //创建hdfs对象
            FileSystem fs = FileSystem.get(conf);
            //判断输出路径是否重复
            if(fs.exists(new Path(args[1]))) {
                fs.delete(new Path(args[1]),true);
            }
            // 创建Job对象
            Job job = Job.getInstance(conf);
            // 设置提交主类
            job.setJarByClass(ETLApp.class);
            // 设置Mapper类相关的参数
            job.setMapperClass(ETLMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            // 设置输入路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            // 设置输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            // 提交任务
            job.waitForCompletion(true);
        }
  • 相关阅读:
    HDU 4814
    POJ 3415
    HDU 4941
    C scanf()
    hdu 4850 Wow! Such String!
    HDU 4828 Grids
    HDU 4832 Chess
    HDU 4831
    SpringCloud 网飞系 转换阿里系2
    用jianmu建木自动化打包vue前端应用,并远程ssh建立文件夹,scp文件至对应目录
  • 原文地址:https://www.cnblogs.com/wyk1/p/13955449.html
Copyright © 2011-2022 走看看