zoukankan      html  css  js  c++  java
  • Hadoop学习笔记2

    转载请标注原链接http://www.cnblogs.com/xczyd/p/8608906.html

    Hdfs学习笔记1 - 使用Java API访问远程hdfs集群中,我们已经可以完成了访问hdfs的配置。

    接下来我们试图写一个最简单的map reduce程序。网上一般给的Demo都是统计词频(Word Count),

    于是我们也简单先实现一下:

    首先准备一个内容大致如下的test.txt文件:

    aa
    bbb
    aaa
    ab
    ba
    bb
    bbb
    bba
    baa
    aa
    aaa
    aa
    aab

    每行有且仅有一个单词,然后我们的程序需要完成的任务是统计每个词出现的次数。代码如下:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.*;
    import java.util.StringTokenizer;
    
    public class MRTest {
    
        public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable> {
    
            private Text word = new Text();
            private final static IntWritable one = new IntWritable(1);
    
            //key: 行号//value: 单词
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, 
           InterruptedException { StringTokenizer tokenizer
    = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); word.set(token); context.write(word, one); } } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text,
         IntWritable> { private IntWritable result = new IntWritable(); //key: 单词 //values: 在map阶段写入的one的数量 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context)
           throws IOException, InterruptedException { int sum = 0; for (IntWritable value: values) { sum += value.get(); } result.set(sum); context.write(key, result); } }

       public static void main(String[] args) throws IOException, ClassNotFoundException,
         InterruptedException { System.setProperty(
    "HADOOP_USER_NAME", "root"); String hdfsUserName = "root"; String inputPath = "/jzhang4/test2.txt"; String outputPath = "/jzhang4/out"; Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://hd-nn-test-01:9000"); conf.set("dfs.client.use.datanode.hostname","true"); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(outputPath), true); Job job = Job.getInstance(conf); job.setJarByClass(MRTest.class); job.setJobName("jzhang4_avg_calc"); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); TextInputFormat.setInputPaths(job, new Path(inputPath)); TextOutputFormat.setOutputPath(job, new Path(outputPath)); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); while (true) { if (job.waitForCompletion(true)) { break; } else { Thread.sleep(1000); } } } }

    首先在map阶段,我们每读到一个word,就往context里面对应的word上添加一个1;

    然后再reduce阶段,对于每一个word,再把这些1给累加起来;

    main函数中有一些对于job的配置,主要是为了在IDE中运行此map reduce程序,

    如果想要打成jar包放到服务器上去运行,或者是想在yarn上执行,需要调整配置,这些内容不在本篇讨论范围之内。

    马不停蹄(事实上停了一天|||- -),立刻开始第二个Map Reduce程序。给定如下这么一个文件:

    10
    9
    8
    7
    6
    5
    4
    3
    2
    1
    0

    每行有且仅有一个数字。写一个map reduce程序来计算所有数字的平均值。

    第一反应当然是照抄一下上面的程序,只是在map阶段往context里写入要统计的数字本身,

    然后reduce阶段把这些数字加和再除以数字的数目即可。

    但是老板说,这么操作过于naive。于是想到了map reduce中的计数器(Recorders),利用计数器可以

    在reduce阶段什么也不做就得到均值。于是代码如下:

    public static class CalculateAvgMap extends Mapper<Object, Text, Text, 
      IntWritable> {   private final static IntWritable one = new IntWritable(1);   public enum Recorders {     ItemCounter,     ValueCounter   }   //key: 行号   //value: 数字   @Override   protected void map(Object key, Text value, Context context) throws IOException,
        InterruptedException {     StringTokenizer tokenizer
    = new StringTokenizer(value.toString());     while (tokenizer.hasMoreTokens()) {       int number = Integer.parseInt(tokenizer.nextToken());       context.getCounter(Recorders.ItemCounter).increment(1);       context.getCounter(Recorders.ValueCounter).increment(number);     }   } } public static class CalculateAvgReduce extends Reducer<Text, IntWritable, Text,
      IntWritable> {   private DoubleWritable result = new DoubleWritable();   @Override   protected void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {   } }

    然后在适当的地方(比如main函数中)获得计数器的值即可:

     public static void main(String[] args) throws IOException, ClassNotFoundException, 
         InterruptedException { System.setProperty(
    "HADOOP_USER_NAME", "root"); String hdfsUserName = "root"; String inputPath = "/jzhang4/test2.txt"; String outputPath = "/jzhang4/out"; Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://hd-nn-test-01:9000"); conf.set("dfs.client.use.datanode.hostname","true"); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(outputPath), true); Job job = Job.getInstance(conf); job.setJarByClass(MRTest.class); job.setJobName("jzhang4_avg_calc"); job.setMapperClass(CalculateAvgMap.class); job.setReducerClass(CalculateAvgReduce.class); TextInputFormat.setInputPaths(job, new Path(inputPath)); TextOutputFormat.setOutputPath(job, new Path(outputPath)); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
    while (true) { if (job.waitForCompletion(true)) { break; } else { Thread.sleep(1000); } } Counters counters = job.getCounters(); Counter itemCounter = counters.findCounter(
           CalculateAvgMap.Recorders.ItemCounter);
    System.out.println(itemCounter.getName()
    + " " + itemCounter.getValue()); Counter valueCounter = counters.findCounter(
           CalculateAvgMap.Recorders.ValueCounter);
    System.out.println(valueCounter.getName()
    + " " + valueCounter.getValue()); }

    这个实现有一定的问题,就是加合的结果会存在超界的风险。如果想要避免超界,比较合适的做法是利用bitmap的思想,

    开32个或者64个Recorder,分别记录每一个bit的count数。当然在enum里面写茫茫多Recorder有点蠢,

    在找到更优雅的办法之前,暂时先不写这么丑陋的代码了...

  • 相关阅读:
    ironic port bind
    pdb /usr/bin/neutron-server
    networking_generic_switch
    [CodeForces586D]Phillip and Trains
    [CodeForces598D]Igor In the Museum
    [poj3468]A Simple Problem with Integers
    [bzoj1503][NOI2004]郁闷的出纳员
    [bzoj1208][HNOI2004]宠物收养所
    [luogu3384][模板]树链剖分
    [CodeForces869A]The Artful Expedient
  • 原文地址:https://www.cnblogs.com/xczyd/p/8608906.html
Copyright © 2011-2022 走看看