zoukankan      html  css  js  c++  java
  • 在MapReduce中使用Avro

    个人认为在MapReduce中使用Avro可以提升数据的处理性能,主要是以下几点:

    • 向Job提供数据文件时可以使用Avro序列化过的二进制数据文件
    • 在数据解析方面速度比较快
    • 排序功能

    Avro官网也提供了一个ColorCount这样的一个案例,演示使用Avro序列化过的二进制数据文件作为MapReduce的Job的输入数据,并且完成计算之后,输出结果也是Avro序列化后的数据文件,下面是这个案例源码及相关步骤:

       1. 项目的pom文件:

    <dependencies>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-mapred</artifactId>
                <version>1.7.7</version>
                <classifier>hadoop2</classifier>   //这是maven的分类器,用来进一步来确定jar包的类别的
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.9.2</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.zpark.demo.avro.mapreduce.MapReduceColorCount</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>

    注意事项:

    在引入依赖 avro-mapred 时一定要设置分类器属性
    <classifier>hadoop2</classifier>,并且在使用不熟悉的依赖时,一定看下它的pom文件内容,看里面是不是定义了分类器。


    2. Mapper和Reducer代码
    public class MapReduceColorCount extends Configured implements Tool {
    
        public static class ColorCountMapper extends
                Mapper<AvroKey<GenericRecord>, NullWritable, Text, IntWritable> {
    
            @Override
            public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
                    throws IOException, InterruptedException {
    
                String color = (String)key.datum().get("favorite_color");
                if (color == null) {
                    color = "none";
                }
                context.write(new Text(color), new IntWritable(1));
            }
        }
    
        public static class ColorCountReducer extends
                Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {
    
            @Override
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context) throws IOException, InterruptedException {
    
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                context.write(new AvroKey<CharSequence>(key.toString()), new AvroValue<Integer>(sum));
            }
        }
    
        public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.println("Usage: MapReduceColorCount <input path> <output path>");
                return -1;
            }
    
            //Job job = new Job(getConf());
    
            Job job = Job.getInstance(getConf(), "word count");
            job.setJarByClass(MapReduceColorCount.class);
            job.setJobName("Color Count");
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setInputFormatClass(AvroKeyInputFormat.class);
            job.setMapperClass(ColorCountMapper.class);
            AvroJob.setInputKeySchema(job, ColorCountSchema.schema);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
            job.setReducerClass(ColorCountReducer.class);
            AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
            AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
    
            return (job.waitForCompletion(true) ? 0 : 1);
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new MapReduceColorCount(), args);
            System.exit(res);
        }
    }

    3. Avro Schema

    通过下面这个类来产成Avro序列时的Schema对象

    public class ColorCountSchema {
    
        public static Schema schema = new Schema.Parser().parse(
                new StringBuilder()
                        .append("{"namespace": "com.zpark.demo.avro.mapreduce",")
                            .append(""type": "record",")
                            .append(""name": "User",")
                            .append(""fields": [")
                                            .append("{"name": "name", "type": "string"},")
                                            .append("{"name": "favorite_number",  "type": ["int", "null"]},")
                                            .append("{"name": "favorite_color", "type": ["string", "null"]}")
                                        .append("]")
                        .append("}").toString()
        );
    }

    下面是对应的user.avsc文件的内容

    {"namespace": "com.zpark.demo.avro.mapreduce",
     "type": "record",
     "name": "User",
     "fields": [
         {"name": "name", "type": "string"},
         {"name": "favorite_number",  "type": ["int", "null"]},
         {"name": "favorite_color", "type": ["string", "null"]}
     ]
    }

    4. 打包上传Jar包到Hadoop环境下运行

      一定要注意,同时需要把依赖jar包 avro-mapred-1.7.7-hadoop2.jar 上传到 $HADOOP_HOME/share/hadoop/mapreduce目录下,并且一定上传分类器版本对应haddop2的jar,否则会报下面的错:

     https://stackoverflow.com/questions/29448222/found-interface-org-apache-hadoop-mapreduce-taskattemptcontext

    5.查看计算结果
    执行后会输出part-r-00000.avro这样的计算结果文件,可以通过 java -jar avro-tools-1.9.1.jar tojson part-r-00000.avro来查看,此外通过java -jar avro-tools-1.9.1.jar help可以查看avro-tools的详细命令列表
    Version 1.9.1
     of Apache Avro
    Copyright 2010-2015 The Apache Software Foundation
    
    This product includes software developed at
    The Apache Software Foundation (https://www.apache.org/).
    ----------------
    Available tools:
        canonical  Converts an Avro Schema to its canonical form
              cat  Extracts samples from files
          compile  Generates Java code for the given schema.
           concat  Concatenates avro files without re-compressing.
      fingerprint  Returns the fingerprint for the schemas.
       fragtojson  Renders a binary-encoded Avro datum as JSON.
         fromjson  Reads JSON records and writes an Avro data file.
         fromtext  Imports a text file into an avro data file.
          getmeta  Prints out the metadata of an Avro data file.
        getschema  Prints out schema of an Avro data file.
              idl  Generates a JSON schema from an Avro IDL file
     idl2schemata  Extract JSON schemata of the types from an Avro IDL file
           induce  Induce schema/protocol from Java class/interface via reflection.
       jsontofrag  Renders a JSON-encoded Avro datum as binary.
           random  Creates a file with randomly generated instances of a schema.
          recodec  Alters the codec of a data file.
           repair  Recovers data from a corrupt Avro Data file
      rpcprotocol  Output the protocol of a RPC service
       rpcreceive  Opens an RPC Server and listens for one message.
          rpcsend  Sends a single RPC message.
           tether  Run a tethered mapreduce job.
           tojson  Dumps an Avro data file as JSON, record per line or pretty.
           totext  Converts an Avro data file to a text file.
         totrevni  Converts an Avro data file to a Trevni file.
      trevni_meta  Dumps a Trevni file's metadata as JSON.
    trevni_random  Create a Trevni file filled with random instances of a schema.
    trevni_tojson  Dumps a Trevni file as JSON.
    
    
    
     
    
    
    
  • 相关阅读:
    Eclipse汉化教程
    php课程---文件操作及文件上传的代码总结
    php课程---Json格式规范需要注意的小细节
    php课程---php使用PDO方法详解(转)
    php课程---随机数
    php课程---Ajax(老师详解)
    php课程---JavaScript与Jquery的区别
    php课程---JavaScript与Jquery的区别(转)
    php课程---初学PDO
    php课程---练习(联系人信息表)
  • 原文地址:https://www.cnblogs.com/hzhuxin/p/12271729.html
Copyright © 2011-2022 走看看