zoukankan      html  css  js  c++  java
  • 数据清洗

    一、需求

    去掉日志,小于11个

    二、代码

    1、Mapper

    package com.wt.etl;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Text k = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            super.map(key, value, context);
            String line = value.toString();
            boolean result = parseLog(line, context);
            if (!result){
                return;
            }
            k.set(line);
        }
    
        private boolean parseLog(String line, Context context) {
            String[] fields = line.split("\s");
            if (fields.length < 11){
                // 系统计数器
                context.getCounter("map", "true").increment(1);
                return true;
            }else {
                context.getCounter("map", "false").increment(1);
                return false;
            }
        }
    }

    2、Driver

    package com.wt.etl;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class ETLDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "E:\a\input2", "E:\a\output2" };
    
            // 1 获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 2 加载jar包
            job.setJarByClass(ETLDriver.class);
    
            // 3 关联map
            job.setMapperClass(ETLMapper.class);
    
            // 4 设置最终输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 设置reducetask个数为0
            job.setNumReduceTasks(0);
    
            // 5 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 6 提交
            boolean wait = job.waitForCompletion(true);
            System.exit(wait? 0:1);
        }
    }
  • 相关阅读:
    Jmeter实现ajax异步同时发送请求
    数据构造技术框架的搭建及使用
    Maven安装与使用
    TFS2008安装环境
    ORACLE隐式提交导致的ORA01086错误:SAVEPOINT“丢失”
    关于记忆与学习
    ORACLE中异常处理
    【笔记:ORACLE基础】正则表达式
    malloc()和relloc()的用法【转】
    【笔记:ORACLE基础】用户管理
  • 原文地址:https://www.cnblogs.com/wt7018/p/13649577.html
Copyright © 2011-2022 走看看