zoukankan      html  css  js  c++  java
  • Hadoop2.4.1 使用MapReduce简单的数据清洗

    package com.bank.service;

    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    /**
     * 将非结构化的数据处理为结构化数据
     * @author mengyao
     *
     */
    import com.bank.entity.CNY;

    public class CnyDataFormat extends Configured implements Tool {

        static class CnyDataFormatMapper extends Mapper<LongWritable, Text, NullWritable, CNY>{
            
            CNY cny = new CNY();
            
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split(" ");
                if (fields.length == 42) {
                    String gzh = fields[12] ;
                    String currency = fields[9];
                    String version = fields[10];
                    String valuta = fields[11];
                    long qfTime;
                    try {
                        qfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(fields[3]+" "+fields[4]).getTime();
                    } catch (ParseException e) {
                        qfTime = System.currentTimeMillis();
                    }
                    int flag = Integer.parseInt(fields[5]);
                    String machineID = fields[13];
                    cny.set(gzh, currency, version, valuta, qfTime, flag, machineID);
                    context.write(NullWritable.get(), cny);
                } else {
                    System.err.println(" ERROR: data format failed!");
                }
            }
        }
        
        static class CnyDataFormatReduce extends Reducer<NullWritable, CNY, NullWritable, CNY>{
            @Override
            protected void reduce(NullWritable key, Iterable<CNY> value, Context context) throws IOException, InterruptedException {
                for (CNY cny : value) {
                    context.write(NullWritable.get(), cny);
                }
            }
        }

        @Override
        public int run(String[] arg0) throws Exception {
            Job job = Job.getInstance(getConf(), CnyDataFormat.class.getSimpleName());
            job.setJarByClass(CnyDataFormat.class);                                //设置main函数所在的类
            
            FileInputFormat.setInputPaths(job, new Path(arg0[0]));
            job.setMapperClass(CnyDataFormatMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(CNY.class);
            
            job.setReducerClass(CnyDataFormatReduce.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(CNY.class);
            FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
            
            return job.waitForCompletion(true) ? 0 : 1;                                //等待MapReduce执行完成并打印作业进度详情
        }

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (paths.length != 2) {
                System.err.println("Usage: " + CnyDataFormat.class.getName() + " <in> <out>");
                System.exit(2);
            }
            int status = ToolRunner.run(new CnyDataFormat(), args);
            System.exit(status);
            
        }
    }

    package com.bank.entity;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    /**
     * 实现Hadoop的序列化接口
     * @author mengyao
     *
     */
    public class CNY implements Writable {

        private String gzh;
        private String currency;
        private String version;
        private String valuta;
        private long qfTime;
        private int flag;
        private String machineID;
        
        @Override
        public void readFields(DataInput in) throws IOException {
            this.gzh = in.readUTF();
            this.currency = in.readUTF();
            this.version = in.readUTF();
            this.valuta = in.readUTF();
            this.qfTime = in.readLong();
            this.flag = in.readInt();
            this.machineID = in.readUTF();
        }
        
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.gzh);
            out.writeUTF(this.currency);
            out.writeUTF(this.version);
            out.writeUTF(this.valuta);
            out.writeLong(this.qfTime);
            out.writeInt(this.flag);
            out.writeUTF(this.machineID);
        }

        public void set(String gzh, String currency, String version,
                String valuta, long qfTime, int flag, String machineID) {
            this.gzh = gzh;
            this.currency = currency;
            this.version = version;
            this.valuta = valuta;
            this.qfTime = qfTime;
            this.flag = flag;
            this.machineID = machineID;
        }

        @Override
        public String toString() {
            return this.gzh +" "+ this.currency +" "+ this.version +" "+ this.valuta +" "+ this.qfTime +" "+ this.flag +" "+ this.machineID;
        }

        public String getGzh() {
            return gzh;
        }

        public void setGzh(String gzh) {
            this.gzh = gzh;
        }

        public String getCurrency() {
            return currency;
        }

        public void setCurrency(String currnecy) {
            this.currency = "cny";
        }

        public String getVersion() {
            return version;
        }

        public void setVersion(String version) {
            this.version = version;
        }

        public String getValuta() {
            return valuta;
        }

        public void setValuta(String valuta) {
            this.valuta = valuta;
        }

        public long getQfTime() {
            return qfTime;
        }

        public void setQfTime(long qfTime) {
            this.qfTime = qfTime;
        }

        public int getFlag() {
            return flag;
        }

        public void setFlag(int flag) {
            this.flag = flag;
        }

        public String getMachineID() {
            return machineID;
        }

        public void setMachineID(String machineID) {
            this.machineID = machineID;
        }
       
        
    }

  • 相关阅读:
    【ELK】重置 elasticsearch 的超管 elastic 账号密码
    python两种获取剪贴板内容的方法
    python创建列表和向列表添加元素方法
    7道Python函数相关的练习题
    Python中的那些“坑”
    Python基础教程:copy()和deepcopy()
    Python教程:optparse—命令行选项解析器
    Python教程:面向对象编程的一些知识点总结
    第一个pypi项目发布成功
    一个Markdown文件处理程序
  • 原文地址:https://www.cnblogs.com/mengyao/p/4226815.html
Copyright © 2011-2022 走看看