zoukankan      html  css  js  c++  java
  • 《Hadoop实战》之专利统计

    数据

    专利数据的特性

    • 专利引用数据所构成的关系图与网页链接以及社会网络图大同小异
    • 专利发布以时间为序,特性类似于时间序列
    • 专利关联到一个人和一个位置,可视为个人信息或地理数据

    首先拿到专利数据:http://data.nber.org/patents/

    本文使用是的cite75-99.txt,该文件涵盖了自1975年到1999年间对美国专利的引用,包含超过1600万条数据,前几行如下图:

    其中第一列为专利号、第二列为被第一列引用的专利号

    CITING CITED
    3858241 956203

    需求一

    1. 读取专利引用数据,对于每一个专利找到哪些专列对他进行了引用并进行合并。
    2. 进行倒序排序

    思路

    • 针对1,将Mapper的键值互换输出即可
    • 针对2,简单方法是设置reduce=1,直接输出全局排序文件
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    
    // 读取专利引用数据,对于每一个专利找到哪些专列对他进行了引用并进行合并。
    public class FindCitedPatentsAndOrder extends Configured implements Tool {
    
        public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
    
            @Override
            public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
                    throws IOException {
                output.collect(value, key);  // 关键点
            }
        }
    
        public static class ReduceClass extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
    
            @Override
            public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output,
                               Reporter reporter) throws IOException {
                String csv = "";
                while (values.hasNext()) {
                    if (csv.length() > 0) csv += ",";
                    csv += values.next().toString();
                }
                output.collect(key, new Text(csv));
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            JobConf job = new JobConf(getConf(), getClass());
    
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
    
            job.setJobName("FindCitedPatentsAndOrder");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(ReduceClass.class);
    
            job.setInputFormat(KeyValueTextInputFormat.class);
            job.setOutputFormat(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.set("key.value.separator.in.input.line", ",");
    
            JobClient.runJob(job);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new FindCitedPatentsAndOrder(), args);
            System.exit(exitCode);
        }
    }
    

    需求二

    • 计算不同引用次数专利的数目

    思路

    • 在FindCitedPatentsAndOrder的基础上修改Reduce函数,统计数目
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    // 计算不同引用次数专利的数目
    public class CitedPatentsNumberCounter extends Configured implements Tool {
        public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
    
            @Override
            public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
                    throws IOException {
                output.collect(value, key);  // 关键点
            }
        }
    
        public static class ReduceClass extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> {
    
            @Override
            public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output,
                               Reporter reporter) throws IOException {
                int count = 0;
                while (values.hasNext()){
                    values.next();
                    count++;
                }
                output.collect(key, new IntWritable(count));
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            JobConf job = new JobConf(getConf(), getClass());
    
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
    
            job.setJobName("CitedPatentsNumberCounter");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(ReduceClass.class);
    
            job.setInputFormat(KeyValueTextInputFormat.class);
            job.setOutputFormat(TextOutputFormat.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);  // 同时设置K2,V2和K3,V3的类型
    
            JobClient.runJob(job);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new CitedPatentsNumberCounter(), args);
            System.exit(exitCode);
        }
    }
    
    

    需求三

    • 在需求二输出的被引用数量数据的基础上,统计被引用数量的频次

    思路

    • 将输入数据替换成需求二的输出数据即可
    • 为了重用Mapper里面的OutputKey和OutputValue,将它们在类中初始化
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class CitationFrequencyStatistics extends Configured implements Tool {
        public static class MapClass extends MapReduceBase implements Mapper<Text, Text, IntWritable, IntWritable> {
            private final  static IntWritable UNO = new IntWritable(1);  // 单位1
            private IntWritable citationCount = new IntWritable();
    
    
            @Override
            public void map(Text key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
                    throws IOException {
                citationCount.set(Integer.parseInt(value.toString()));
                output.collect(citationCount, UNO);  // 关键点
            }
        }
    
        public static class ReduceClass extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    
            @Override
            public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, IntWritable> output,
                               Reporter reporter) throws IOException {
                int count = 0;
                while (values.hasNext()){
                    values.next();
                    count++;
                }
                output.collect(key, new IntWritable(count));
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            JobConf job = new JobConf(getConf(), getClass());
    
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
    
            job.setJobName("CitationFrequencyStatistics");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(ReduceClass.class);
    
            job.setInputFormat(KeyValueTextInputFormat.class);
            job.setOutputFormat(TextOutputFormat.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);  // 同时设置K2,V2和K3,V3的类型
    
            JobClient.runJob(job);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new CitationFrequencyStatistics(), args);
            System.exit(exitCode);
        }
    }
    
    

    Hadoop的新API

    Hadoop最新版本的MapReduce Release 0.20.0的API包括了一个全新的Mapreduce JAVA API,有时候也称为上下文对象。

      新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用 。

      新的API和旧的API之间有下面几个明显的区别。

    • 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
    • 新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
    • 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
    • 新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
    • 新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。

    基于新api重写的Hadoop基础程序模板

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    
    import java.io.IOException;
    
    public class MyJob extends Configured implements Tool {
    
        public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] citation = value.toString().split(",");
                context.write(new Text(citation[1]), new Text(citation[0]));
            }
        }
    
        public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                String csv = "";
                for (Text val: values) {
                    if (csv.length() > 0) csv += ",";
                    csv += val.toString();
                }
                context.write(key, new Text(csv));
            }
        }
    
        @Override
        public int run(String[] strings) throws Exception {
            Configuration conf = getConf();
    
            Job job = new Job(conf, "Myjob");
            job.setJarByClass(MyJob.class);
    
            job.setMapperClass(MapClass.class);
            job.setReducerClass(ReduceClass.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            return 0;
        }
    }
    
    
  • 相关阅读:
    线程
    网络编程
    正则表达式
    XML
    java----八种排序算法
    Java 关键字 速查表
    Day_19多线程(下)
    Day18_进程(中)
    Day17_进程(上)
    Day15_IO流(上)
  • 原文地址:https://www.cnblogs.com/vvlj/p/14099275.html
Copyright © 2011-2022 走看看