zoukankan      html  css  js  c++  java
  • hadoop 使用Avro求最大值

    在上例中:hadoop MapReduce辅助排序解析,为了求每年的最大数据使用了mapreduce辅助排序的方法。

    本例中介绍利用Avro这个序列化框架的mapreduce功能来实现求取最大值。Avro的优点在这里不做扩展。

    1、依赖引入,不使用插件

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.2.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-mapred</artifactId>
                <version>1.8.2</version>
            </dependency>

    2、定义Avro数据结构,样本依然使用上例的数据样本,只有年份和数据两个字段。

    Avro数据结构应该是这样:
    {
    	"type":"record",
    	"name":"WeatherRecord",
    	"doc":"A weather reading",
    	"fields":[
    		{"name":"year","type":"int"},
    		{"name":"temperature","type":"int"}
    	]	
    }

    本例中直接定义为常量,也可以根据需求直接从文件中读入,各有优劣。

    public class AvroSchemas {
        public static final Schema SCHEMA = new Schema.Parser().parse("{
    " +
                "	"type":"record",
    " +
                "	"name":"WeatherRecord",
    " +
                "	"doc":"A weather reading",
    " +
                "	"fields":[
    " +
                "		{"name":"year","type":"int"},
    " +
                "		{"name":"temperature","type":"int"}
    " +
                "	]	
    " +
                "}");
    
    }

    3、mapper

    public class AvroMapper extends Mapper<LongWritable,Text,AvroKey<Integer>,AvroValue<GenericRecord>> {
        private RecordParser parser = new RecordParser();
        private GenericRecord record = new GenericData.Record(AvroSchemas.SCHEMA);
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value.toString());
            if(parser.isValid()){
                record.put("year",parser.getYear());
                record.put("temperature",parser.getData());
                context.write(new AvroKey<>(parser.getYear()),new AvroValue<>(record));
            }
        }
    }

    4、reducer

    public class AvroReducer extends Reducer<AvroKey<Integer>,AvroValue<GenericRecord>,AvroKey<GenericRecord>,NullWritable> {
    
        @Override
        protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
            GenericRecord max = null;
            for (AvroValue<GenericRecord> value : values){
                GenericRecord record = value.datum();
                if(max==null ||
                        (Integer)record.get("temperature") > (Integer) max.get("temperature")){
                    //必须重新生成GenericRecord,不能直接max=record进行对象引用
                    //迭代算法为了高效,直接重用了实例
                    max = newRecord(record);
                }
            }
            context.write(new AvroKey<>(max),NullWritable.get());
        }
    
        private GenericRecord newRecord(GenericRecord value){
            GenericRecord record = new GenericData.Record(AvroSchemas.SCHEMA);
            record.put("year",value.get("year"));
            record.put("temperature",value.get("temperature"));
    
            return record;
        }
    }

    5、job,这里是关键,和普通job所有区别

    public class AvroSort extends Configured implements Tool {
        /**
         * Execute the command with the given arguments.
         *
         * @param args command specific arguments.
         * @return exit code.
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            conf.set("mapreduce.job.ubertask.enable","true");
    
            Job job = Job.getInstance(conf,"Avro sort");
            job.setJarByClass(AvroSort.class);
    
            //通过AvroJob直接设置Avro key和value的输入和输出,而不是使用Job来设置
            AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
            AvroJob.setMapOutputValueSchema(job,AvroSchemas.SCHEMA);
            AvroJob.setOutputKeySchema(job,AvroSchemas.SCHEMA);
    
            job.setMapperClass(AvroMapper.class);
            job.setReducerClass(AvroReducer.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(AvroKeyOutputFormat.class);
            //也可以输出文本格式,AvroKey会被转换成json文本模式
    //        job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            Path outPath = new Path(args[1]);
            FileSystem fileSystem = outPath.getFileSystem(conf);
            //删除输出路径
            if(fileSystem.exists(outPath))
            {
                fileSystem.delete(outPath,true);
            }
    
            return job.waitForCompletion(true) ? 0:1;
        }
    
        public static void main(String[] args) throws Exception{
            int exitCode = ToolRunner.run(new AvroSort(),args);
            System.exit(exitCode);
        }
    }

    6、查看Avro文件,需要下载Avro的工具jar包avro-tools-1.8.2.jar,官方镜像链接: https://mirrors.tuna.tsinghua.edu.cn/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar

    [hadoop@bigdata-senior01 ~]$ java -jar avro-tools-1.8.2.jar tojson part-r-00000.avro 
    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    {"year":1990,"temperature":100}
    {"year":1991,"temperature":100}
    {"year":1992,"temperature":100}
    {"year":1993,"temperature":100}
    {"year":1994,"temperature":100}
    {"year":1995,"temperature":100}
    {"year":1996,"temperature":100}
    {"year":1997,"temperature":100}
    {"year":1998,"temperature":100}
    {"year":1999,"temperature":100}
    {"year":2000,"temperature":100}

    如果job中,使用的是文本输出,那么直接使用cat就可以查看。

    [hadoop@bigdata-senior01 ~]$ hadoop fs -cat /output6/part-r-00000
    {"year": 1990, "temperature": 100}
    {"year": 1991, "temperature": 100}
    {"year": 1992, "temperature": 100}
    {"year": 1993, "temperature": 100}
    {"year": 1994, "temperature": 100}
    {"year": 1995, "temperature": 100}
    {"year": 1996, "temperature": 100}
    {"year": 1997, "temperature": 100}
    {"year": 1998, "temperature": 100}
    {"year": 1999, "temperature": 100}
    {"year": 2000, "temperature": 100}
  • 相关阅读:
    BigDecimal工具类处理精度计算
    Redis的简单使用和介绍
    数据库优化知识总结
    js弹出QQ对话框在线交谈
    火焰灯menu修改之后,可以实现数遍点击小方块停留在当前页面
    js作用域的一个小例子
    js中this的四种调用模式
    jquery火焰等效果导航菜单
    appserver配置虚拟主机
    一个类似百度文库选中弹出个小框的效果
  • 原文地址:https://www.cnblogs.com/asker009/p/10436132.html
Copyright © 2011-2022 走看看