zoukankan      html  css  js  c++  java
  • hadoop 使用Avro排序

    在上例中,使用Avro框架求出数据的最大值,本例使用Avro对数据排序,输入依然是之前的样本,输出使用文本(也可以输出Avro格式)。

    1、在Avro的Schema中直接设置排序方向。

    dataRecord.avsc,放入resources目录下:

    {
    "type":"record", "name":"WeatherRecord", "doc":"A weather reading", "fields":[ {"name":"year","type":"int"}, {"name":"temperature","type":"int","order":"descending"} ] }

    原常量类:

    public class AvroSchemas {
        private Schema currentSchema;
    
        //本例中不使用常量,修改成资源中加载
        public static final Schema SCHEMA = new Schema.Parser().parse("{
    " +
                "	"type":"record",
    " +
                "	"name":"WeatherRecord",
    " +
                "	"doc":"A weather reading",
    " +
                "	"fields":[
    " +
                "		{"name":"year","type":"int"},
    " +
                "		{"name":"temperature","type":"int","order":"descending"}
    " +
                "	]	
    " +
                "}");
    
        public AvroSchemas() throws IOException {
            Schema.Parser parser = new Schema.Parser();
            //采用从资源文件中读取Avro数据格式
            this.currentSchema = parser.parse(getClass().getResourceAsStream("dataRecord.avsc"));
        }
    
    
        public Schema getCurrentSchema() {
            return currentSchema;
        }
    }

    2、mapper

    public class AvroMapper extends Mapper<LongWritable,Text,AvroKey<GenericRecord>,AvroValue<GenericRecord>> {
        private RecordParser parser = new RecordParser();
    //    private GenericRecord record = new GenericData.Record(AvroSchemas.SCHEMA);
        private AvroSchemas schema;
        private GenericRecord record;
    
        public AvroMapper() throws IOException {
            schema =new AvroSchemas();
            record = new GenericData.Record(schema.getCurrentSchema());
        }
    
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            parser.parse(value.toString());
            if(parser.isValid()){
                record.put("year",parser.getYear());
                record.put("temperature",parser.getData());
                context.write(new AvroKey<>(record),new AvroValue<>(record));
            }
        }
    }

    3、reducer

    public class AvroReducer extends Reducer<AvroKey<GenericRecord>,AvroValue<GenericRecord>,IntPair,NullWritable> {
        //多文件输出,本例中每年一个文件
        private MultipleOutputs<IntPair,NullWritable> multipleOutputs;
    
        /**
         * Called once at the start of the task.
         *
         * @param context
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            multipleOutputs = new MultipleOutputs<>(context);
        }
    
    
        @Override
        protected void reduce(AvroKey<GenericRecord> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
            //在混洗阶段完成排序,reducer只需直接输出数据
            for (AvroValue<GenericRecord> value : values){
                GenericRecord record = value.datum();
                //多文件输出,每年一个文件。
                multipleOutputs.write(new IntPair((Integer) record.get("year"),(Integer)(record.get("temperature"))),NullWritable.get(),record.get("year").toString());
    //            context.write(new IntPair((Integer) record.get("year"),(Integer)(record.get("temperature"))),NullWritable.get());
            }
        }
    
    }

    4、job

    public class AvroSort extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            conf.set("mapreduce.job.ubertask.enable","true");
    
            Job job = Job.getInstance(conf,"Avro sort");
            job.setJarByClass(AvroSort.class);
    
            //通过AvroJob直接设置Avro key和value的输入和输出,而不是使用Job来设置
            AvroJob.setMapOutputKeySchema(job, AvroSchemas.SCHEMA);
            AvroJob.setMapOutputValueSchema(job,AvroSchemas.SCHEMA);
    //        AvroJob.setOutputKeySchema(job,AvroSchemas.SCHEMA);
    
            job.setMapperClass(AvroMapper.class);
            job.setReducerClass(AvroReducer.class);
    
            job.setInputFormatClass(TextInputFormat.class);
    //        job.setOutputFormatClass(AvroKeyOutputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            Path outPath = new Path(args[1]);
            FileSystem fileSystem = outPath.getFileSystem(conf);
            //删除输出路径
            if(fileSystem.exists(outPath))
            {
                fileSystem.delete(outPath,true);
            }
    
            return job.waitForCompletion(true) ? 0:1;
        }
    
        public static void main(String[] args) throws Exception{
            int exitCode = ToolRunner.run(new AvroSort(),args);
            System.exit(exitCode);
        }
    }
  • 相关阅读:
    AcWing 递归实现指数型枚举 dfs
    蓝桥杯 不同单词个数统计 map
    蓝桥杯 士兵排队问题 拓扑排序
    蓝桥杯 数字黑洞 模拟
    蓝桥杯 身份证排序 排序
    蓝桥杯 质因数2 分解质因数
    ubuntu开发机初始化
    axios封装
    vue组件
    django配置跨域并开发测试接口
  • 原文地址:https://www.cnblogs.com/asker009/p/10436983.html
Copyright © 2011-2022 走看看