zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之wordcount版本5(九)

      这篇博客,给大家,体会不一样的版本编程。

    代码

    package zhouls.bigdata.myMapReduce.wordcount1;

    import java.io.IOException;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    //4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型
    //map 和 reduce 的数据输入输出都是以 key-value对的形式封装的
    //默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    //mapreduce框架每读一行数据就调用一次该方法
    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {
    //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中 key-value
    //key 是这一行数据的起始偏移量 value 是这一行的文本内容

    //将这一行的内容转换成string类型
    String line = value.toString();

    //对这一行的文本按特定分隔符切分
    String[] words = StringUtils.split(line, " ");

    //遍历这个单词数组输出为kv形式 k:单词 v : 1
    for(String word : words){

    context.write(new Text(word), new LongWritable(1));

    }

    }



    }

    package zhouls.bigdata.myMapReduce.wordcount1;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{



    //框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法
    //<hello,{1,1,1,1,1,1.....}>
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
    throws IOException, InterruptedException {

    long count = 0;
    //遍历value的list,进行累加求和
    for(LongWritable value:values){

    count += value.get();
    }

    //输出这一个单词的统计结果

    context.write(key, new LongWritable(count));

    }

    }

    package zhouls.bigdata.myMapReduce.wordcount1;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    import zhouls.bigdata.myMapReduce.Anagram.Anagram;

    /**
    * 用来描述一个特定的作业
    * 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
    * 还可以指定该作业要处理的数据所在的路径
    * 还可以指定改作业输出的结果放到哪个路径
    * ....
    *
    *
    */
    public class WCRunner implements Tool {
    public int run(String[] arg0) throws Exception {
    Configuration conf = new Configuration();
    //2删除已经存在的输出目录
    Path mypath = new Path(arg0[1]);//下标为1,即是输出路径
    FileSystem hdfs = mypath.getFileSystem(conf);//获取文件系统
    if (hdfs.isDirectory(mypath))
    {//如果文件系统中存在这个输出路径,则删除掉
    hdfs.delete(mypath, true);
    }

    Job wcjob = new Job(conf, "WC");//构建一个job对象,取名为testAnagram

    //设置整个job所用的那些类在哪个jar包
    wcjob.setJarByClass(WCRunner.class);


    //本job使用的mapper和reducer的类
    wcjob.setMapperClass(WCMapper.class);
    wcjob.setReducerClass(WCReducer.class);


    //指定reduce的输出数据kv类型
    wcjob.setOutputKeyClass(Text.class);
    wcjob.setOutputValueClass(LongWritable.class);

    //指定mapper的输出数据kv类型
    wcjob.setMapOutputKeyClass(Text.class);
    wcjob.setMapOutputValueClass(LongWritable.class);





    FileInputFormat.addInputPath(wcjob, new Path(arg0[0]));// 文件输入路径
    FileOutputFormat.setOutputPath(wcjob, new Path(arg0[1]));// 文件输出路径
    //将job提交给集群运行
    wcjob.waitForCompletion(true);

    return 0;

    }


    public static void main(String[] args) throws Exception
    {//定义数组来保存输入路径和输出路径
    //集群路径
    // String[] args0 = { "hdfs://HadoopMaster:9000/wc.txt",
    // "hdfs://HadoopMaster:9000/out/wc/"};

    //本地路径
    String[] args0 = { "./data/wc.txt",
    "out/wc/"};

    int ec = ToolRunner.run( new Configuration(), new WCRunner(), args0);
    System. exit(ec);
    }


    @Override
    public Configuration getConf() {
    // TODO Auto-generated method stub
    return null;
    }


    @Override
    public void setConf(Configuration arg0) {
    // TODO Auto-generated method stub

    }


    }

  • 相关阅读:
    DataStructure 插入排序(Insertion Sort)
    DataStructure 冒泡排序(Bubble Sort)
    Flex【原创】BitmapData高级渲染、实例展示、源码下载
    Flex 运用ProductManager重新启动Air程序
    DataStructure 按位运算方法
    Flex【原创】Air调用本地exe文件完成截图功能
    JS鼠标移动切换图片
    Jquery在线引用地址:
    css 使div垂直、水平居中
    声音播放解决方案(C#)
  • 原文地址:https://www.cnblogs.com/zlslch/p/6163830.html
Copyright © 2011-2022 走看看