zoukankan      html  css  js  c++  java
  • 大数据学习之七——MapReduce简单代码实例

    1.关于MapReduce

    MapReduce是一种可用于数据处理的编程模型,能够支持java、Python、C++等语言。MapReduce程序本质上是并行运行的,因此可以处理大规模数据集,这也是它的优势。

    2.使用hadoop分析数据

    hadoop提供了并行处理,我们将查询表示成MapReduce作业。

    MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值作为输入和输出,并选择它们的类型。程序员还需要定义两个函数:map函数和reduce函数。

    Java  MapReduce

    我们需要三个东西:一个map函数,一个reduce函数和一些用来运行作业的代码。map函数由mapper接口实现。

    Mapper接口是一个泛型类型,有四个形参,分别指定map函数的输入键、输入值、输出键和输出值的类型。这些类型均可在org.apache.hadoop.io包中找到。其中,LongWritable类型相当于java中的Long类型、Text类型相当于java中的String类型、IntWritable类型相当于java中的Integer类型。

    在主函数中经常使用的类有:

    FileOutputFormat类中的静态函数setOutputPath()来指定输出路径,该函数指定了reduce函数输出文件的写入目录。在运行任务前该目录不应该存在。接着通过setMapperClass()和setReducerClass()指定map和reduce类型。setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型。输入的类型通过InputFormat类来控制,在设置定义map和reduce函数的类之后,JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。

    3.统计单词数量代码实例

    package mapreduce01;  //MapReduce工程名字

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    //单词计数

    public class mytest {    

    static String INPUT_PATH="hdfs://master:9000/input/mr.txt";   //待统计的文件路径

    static String OUTPUT_PATH="hdfs://master:9000/output/mr.txt";    //统计结果存放的路径

    static class MyMapper extends Mapper <Object,Object,Text,IntWritable> {     //定义继承mapper类

    protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    //定义map方法

    String[] arr=value.toString().split(",");      //文件中的单词是以“,”分割的,并将每一行定义为一个数组

    for(int i=0;i<arr.length;i++){      //遍历循环每一行,统计单词出现的数量

    context.write(new Text(arr[i]),new IntWritable(1));  

      }

     }  

    }    

    static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{     //定义继承reducer类

    protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      //定义reduce方法

    int count=0;    

    for(IntWritable c:values){     //统计同一个单词的数量

    count+=c.get();    

    }    

    IntWritable outValue=new IntWritable(count);    

    context.write(key,outValue);    

    }   

    }  

     public static void main(String[] args) throws Exception{    //main函数

     Path outputpath=new Path(OUTPUT_PATH);    //输出路径

    Configuration conf=new Configuration();   

    Job job=Job.getInstance(conf);     //定义一个job,启动任务

    FileInputFormat.setInputPaths(job, INPUT_PATH);  

     FileOutputFormat.setOutputPath(job,outputpath);     

     job.setMapperClass(MyMapper.class);  

     job.setReducerClass(MyReduce.class);     

     job.setOutputKeyClass(Text.class);  

     job.setOutputValueClass(IntWritable.class);     

     job.waitForCompletion(true);  

    }

    }

    4.统计去重代码实例

    package mapreduce01;  //MapReduce工程名字

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    //单词去重

    public class testquchong {    

    static String INPUT_PATH="hdfs://master:9000/quchong";   //待统计的文件

    static String OUTPUT_PATH="hdfs://master:9000/quchong/qc";    //统计结果存放的路径

    static class MyMapper extends Mapper<Object,Text,Text,Text>{   

    private static Text line=new Text();      //text相当于string

    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{    

    line=value;   

     context.write(line,new Text(","));     //以“,”规定格式,空格不容易控制,统计key,因为key值是唯一的

     }

     }    

    static class MyReduce extends Reducer<Text,Text,Text,Text>{

    protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{       

    context.write(key,new Text(""));    

     }  

     public static void main(String[] args) throws Exception{   

    Path outputpath=new Path(OUTPUT_PATH);   

    Configuration conf=new Configuration();   

    Job job=Job.getInstance(conf);     

     job.setMapperClass(MyMapper.class);  

     job.setReducerClass(MyReduce.class);   

    job.setCombinerClass(MyReduce.class);     

     job.setOutputKeyClass(Text.class);  

     job.setOutputValueClass(Text.class);     

     FileInputFormat.setInputPaths(job, INPUT_PATH);   

    FileOutputFormat.setOutputPath(job,outputpath);  

     job.waitForCompletion(true);

     }

    }

  • 相关阅读:
    在peoplecode中直接调用SQR
    想起了李雷和韩梅梅
    结婚两周年纪念
    Unix Command Summary
    在PeopleSoft中如何隐藏菜单,导航栏,以及其他定制化链接
    那些朋友们
    整天工作的人为何当不了富翁
    ActiveX简单介绍
    SQL UNION
    Java程序设计问答大全(一)
  • 原文地址:https://www.cnblogs.com/m-study/p/8366698.html
Copyright © 2011-2022 走看看