zoukankan      html  css  js  c++  java
  • 【Hadoop】3、Hadoop-MapReduce使用avro进行数据的序列化与反序列化

    package cn.cutter.demo.hadoop.avro;
    
    import org.apache.hadoop.io.Text;
    
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @ClassName NcdcRecordParser
     * @Description TODO
     * @Author xiaof
     * @Date 2019/2/17 16:36
     * @Version 1.0
     **/
    public class NcdcRecordParser {
        private static final int MISSING_TEMPERATURE = 9999;
    
        private static final DateFormat DATE_FORMAT =
                new SimpleDateFormat("yyyyMMddHHmm");
    
        private String stationId;
        private String observationDateString;
        private String year;
        private String airTemperatureString;
        private int airTemperature;
        private boolean airTemperatureMalformed;
        private String quality;
    
        public void parse(String record) {
            stationId = record.substring(4, 10) + "-" + record.substring(10, 15);
            observationDateString = record.substring(15, 27);
            year = record.substring(15, 19);
            airTemperatureMalformed = false;
            // Remove leading plus sign as parseInt doesn't like them
            if (record.charAt(87) == '+') {
                airTemperatureString = record.substring(88, 92);
                airTemperature = Integer.parseInt(airTemperatureString);
            } else if (record.charAt(87) == '-') {
                airTemperatureString = record.substring(87, 92);
                airTemperature = Integer.parseInt(airTemperatureString);
            } else {
                airTemperatureMalformed = true;
            }
            airTemperature = Integer.parseInt(airTemperatureString);
            quality = record.substring(92, 93);
        }
    
        public void parse(Text record) {
            parse(record.toString());
        }
    
        public boolean isValidTemperature() {
            return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE
                    && quality.matches("[01459]");
        }
    
        public boolean isMalformedTemperature() {
            return airTemperatureMalformed;
        }
    
        public boolean isMissingTemperature() {
            return airTemperature == MISSING_TEMPERATURE;
        }
    
        public String getStationId() {
            return stationId;
        }
    
        public Date getObservationDate() {
            try {
                System.out.println(observationDateString);
                return DATE_FORMAT.parse(observationDateString);
            } catch (ParseException e) {
                throw new IllegalArgumentException(e);
            }
        }
    
        public String getYear() {
            return year;
        }
    
        public int getYearInt() {
            return Integer.parseInt(year);
        }
    
        public int getAirTemperature() {
            return airTemperature;
        }
    
        public String getAirTemperatureString() {
            return airTemperatureString;
        }
    
        public String getQuality() {
            return quality;
        }
    
    }

    通过avro输出数据,我们的数据集是:

    0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
    0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
    0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
    0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
    0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
    package cn.cutter.demo.hadoop.avro;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.mapred.*;
    import org.apache.avro.mapreduce.AvroJob;
    import org.apache.avro.mapreduce.AvroKeyOutputFormat;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    
    /**
     * @ClassName AvroGenericMaxTemperature
     * @Description 通过avro记录的数据文件,使用MapReduce进行解析读取数据
     * @Author xiaof
     * @Date 2019/2/17 15:23
     * @Version 1.0
     **/
    public class AvroGenericMaxTemperature extends Configured implements Tool {
    
        //转换json数据,avro的模式解析
        private static final Schema SCHEMA = new Schema.Parser().parse("{"name":"WeatherRecord","doc":"A weather reading","type":"record","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]}");
    
        public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, AvroKey<Integer>, AvroValue<GenericRecord>> {
            //创建天气解析类对象,数据记录对象
            private NcdcRecordParser ncdcRecordParser = new NcdcRecordParser();
            private GenericRecord genericRecord = new GenericData.Record(SCHEMA);
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //1.解析数据
                ncdcRecordParser.parse(value.toString());
    
                //2.判断数据有效性
                if(ncdcRecordParser.isValidTemperature()) {
                    //3.获取对应的数据进入记录record中,然后输出到上下文对象
                    genericRecord.put("year", ncdcRecordParser.getYearInt());
                    genericRecord.put("temperature", ncdcRecordParser.getAirTemperature());
                    genericRecord.put("stationId", ncdcRecordParser.getStationId());
                    context.write(new AvroKey<Integer>(ncdcRecordParser.getYearInt()), new AvroValue<GenericRecord>(genericRecord));
                }
            }
        }
    
        public static class MaxTemperatureReducer extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
            @Override
            protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
                //遍历所有的数据,获取最大的数据
                GenericRecord max = null;
                for(AvroValue<GenericRecord> value : values) {
                    //获取数据的值,判断max等于空,或者当前温度大于max记录的温度,那么就更新max
                    GenericRecord record = value.datum();
                    if(max == null || (Integer) record.get("temperature") > (Integer) max.get("temperature")) {
                        max = newWeatherRecord(record);
                    }
                }
    
                context.write(new AvroKey(max), NullWritable.get());
            }
    
            private GenericRecord newWeatherRecord(GenericRecord value) {
                GenericRecord record = new GenericData.Record(SCHEMA);
                record.put("year", value.get("year"));
                record.put("temperature", value.get("temperature"));
                record.put("stationId", value.get("stationId"));
    
                return record;
            }
        }
    
    
        @Override
        public int run(String[] strings) throws Exception {
            if (strings.length != 2) {
                System.err.printf("Usage: %s [generic options] <input> <output>
    ",
                        getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
            Job job = Job.getInstance(this.getConf(), "Max temperature");
            job.setJarByClass(this.getClass());
    
            job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
            FileInputFormat.addInputPath(job, new Path(strings[0]));
            FileOutputFormat.setOutputPath(job, new Path(strings[1]));
    
            AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
            AvroJob.setMapOutputValueSchema(job, SCHEMA);
            AvroJob.setOutputKeySchema(job, SCHEMA);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(AvroKeyOutputFormat.class);
    
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setReducerClass(MaxTemperatureReducer.class);
    
            return job.waitForCompletion(true) ? 0: 1;
    
        }
    
        public static void main(String[] args) throws Exception {
    
            System.setProperty("hadoop.home.dir", "F:\hadoop-2.7.7");
            String paths[] = {"H:\ideaworkspace\1-tmp\input\1.txt", "H:\ideaworkspace\1-tmp\output1"};
    
            int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), paths);
            System.exit(exitCode);
    
        }
    }

    结果使用avro-tool进行查看:

    H:>java -jar avro-tools-1.8.2.jar tojson H:ideaworkspace1-tmpoutput1part-r-
    00000.avro

     

  • 相关阅读:
    设计模式——单例模式的一种比较好的写法
    设计模式——观察者模式
    Java中的注解原来是这么用的
    TCP三次握手 四次挥手
    Mat转IplImage IplImage转Mat
    《Android开发艺术探索》读书笔记——Cha3.2.2使用动画实现View的滑动
    11第十二天DBUtils
    Java中几种对象(PO,BO,VO,DTO,POJO,DAO,Entity,JavaBean,JavaBeans)
    10第十一天JDBC事务控制管理
    09重点复习
  • 原文地址:https://www.cnblogs.com/cutter-point/p/10396926.html
Copyright © 2011-2022 走看看