package cn.cutter.demo.hadoop.avro; import org.apache.hadoop.io.Text; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * @ClassName NcdcRecordParser * @Description TODO * @Author xiaof * @Date 2019/2/17 16:36 * @Version 1.0 **/ public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmm"); private String stationId; private String observationDateString; private String year; private String airTemperatureString; private int airTemperature; private boolean airTemperatureMalformed; private String quality; public void parse(String record) { stationId = record.substring(4, 10) + "-" + record.substring(10, 15); observationDateString = record.substring(15, 27); year = record.substring(15, 19); airTemperatureMalformed = false; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); airTemperature = Integer.parseInt(airTemperatureString); } else if (record.charAt(87) == '-') { airTemperatureString = record.substring(87, 92); airTemperature = Integer.parseInt(airTemperatureString); } else { airTemperatureMalformed = true; } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public boolean isMalformedTemperature() { return airTemperatureMalformed; } public boolean isMissingTemperature() { return airTemperature == MISSING_TEMPERATURE; } public String getStationId() { return stationId; } public Date getObservationDate() { try { System.out.println(observationDateString); return DATE_FORMAT.parse(observationDateString); } catch (ParseException e) { throw new IllegalArgumentException(e); } } public String getYear() { return year; } public int getYearInt() { return Integer.parseInt(year); } public int getAirTemperature() { return airTemperature; } public String getAirTemperatureString() { return airTemperatureString; } public String getQuality() { return quality; } }
通过avro输出数据,我们的数据集是:
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
package cn.cutter.demo.hadoop.avro; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.*; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.FileInputStream; import java.io.IOException; /** * @ClassName AvroGenericMaxTemperature * @Description 通过avro记录的数据文件,使用MapReduce进行解析读取数据 * @Author xiaof * @Date 2019/2/17 15:23 * @Version 1.0 **/ public class AvroGenericMaxTemperature extends Configured implements Tool { //转换json数据,avro的模式解析 private static final Schema SCHEMA = new Schema.Parser().parse("{"name":"WeatherRecord","doc":"A weather reading","type":"record","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]}"); public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, AvroKey<Integer>, AvroValue<GenericRecord>> { //创建天气解析类对象,数据记录对象 private NcdcRecordParser ncdcRecordParser = new NcdcRecordParser(); private GenericRecord genericRecord = new GenericData.Record(SCHEMA); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.解析数据 ncdcRecordParser.parse(value.toString()); //2.判断数据有效性 if(ncdcRecordParser.isValidTemperature()) { //3.获取对应的数据进入记录record中,然后输出到上下文对象 genericRecord.put("year", ncdcRecordParser.getYearInt()); genericRecord.put("temperature", ncdcRecordParser.getAirTemperature()); genericRecord.put("stationId", ncdcRecordParser.getStationId()); context.write(new AvroKey<Integer>(ncdcRecordParser.getYearInt()), new AvroValue<GenericRecord>(genericRecord)); } } } public static class MaxTemperatureReducer extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> { @Override protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { //遍历所有的数据,获取最大的数据 GenericRecord max = null; for(AvroValue<GenericRecord> value : values) { //获取数据的值,判断max等于空,或者当前温度大于max记录的温度,那么就更新max GenericRecord record = value.datum(); if(max == null || (Integer) record.get("temperature") > (Integer) max.get("temperature")) { max = newWeatherRecord(record); } } context.write(new AvroKey(max), NullWritable.get()); } private GenericRecord newWeatherRecord(GenericRecord value) { GenericRecord record = new GenericData.Record(SCHEMA); record.put("year", value.get("year")); record.put("temperature", value.get("temperature")); record.put("stationId", value.get("stationId")); return record; } } @Override public int run(String[] strings) throws Exception { if (strings.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output> ", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = Job.getInstance(this.getConf(), "Max temperature"); job.setJarByClass(this.getClass()); job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); FileInputFormat.addInputPath(job, new Path(strings[0])); FileOutputFormat.setOutputPath(job, new Path(strings[1])); AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT)); AvroJob.setMapOutputValueSchema(job, SCHEMA); AvroJob.setOutputKeySchema(job, SCHEMA); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(AvroKeyOutputFormat.class); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); return job.waitForCompletion(true) ? 0: 1; } public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "F:\hadoop-2.7.7"); String paths[] = {"H:\ideaworkspace\1-tmp\input\1.txt", "H:\ideaworkspace\1-tmp\output1"}; int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), paths); System.exit(exitCode); } }
结果使用avro-tool进行查看:
H:>java -jar avro-tools-1.8.2.jar tojson H:ideaworkspace1-tmpoutput1part-r-
00000.avro