zoukankan      html  css  js  c++  java
  • MapReduce的手机流量统计的案例

    程序:(另外一个关于单词计数的总结:http://www.cnblogs.com/DreamDrive/p/5492572.html)
      1 import java.io.IOException;
      2 
      3 import mapreduce.WordCountApp.WordCountMapper.WordCountReducer;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Reducer;
     12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     14 
     15 /**
     16  * 以文本
     17  * hello    you
     18  * hello    me
     19  * 为例子.
     20  * map方法调用了两次,因为有两行
     21  * k2 v2 键值对的数量有几个?
     22  * 有4个.有四个单词.
     23  * 
     24  * 会产生几个分组?
     25  * 产生3个分组.
     26  * 有3个不同的单词.
     27  *
     28  */
     29 public class WordCountApp {
     30     public static void main(String[] args) throws Exception {
     31         //程序在这里运行,要有驱动.
     32         Configuration conf = new Configuration();
     33         Job job = Job.getInstance(conf,WordCountApp.class.getSimpleName());
     34         
     35         //我们运行此程序通过运行jar包来执行.一定要有这句话.
     36         job.setJarByClass(WordCountApp.class);
     37         
     38         FileInputFormat.setInputPaths(job,args[0]);
     39         
     40         job.setMapperClass(WordCountMapper.class);//设置Map类
     41         job.setMapOutputKeyClass(Text.class);//设置Map的key
     42         job.setMapOutputValueClass(LongWritable.class);//设置Map的value
     43         job.setReducerClass(WordCountReducer.class);//设置Reduce的类
     44         job.setOutputKeyClass(Text.class);//设置Reduce的key Reduce这个地方只有输出的参数可以设置. 方法名字也没有Reduce关键字区别于Map
     45         job.setOutputValueClass(LongWritable.class);//设置Reduce的value.
     46         
     47         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     48         job.waitForCompletion(true);//表示结束了才退出,不结束不退出
     49     }
     50     /**
     51      * 4个泛型的意识
     52      * 第一个是LongWritable,固定就是这个类型,表示每一行单词的起始位置(单位是字节)
     53      * 第二个是Text,表示每一行的文本内容.
     54      * 第三个是Text,表示单词
     55      * 第四个是LongWritable,表示单词的出现次数
     56      */
     57     public static class WordCountMapper extends Mapper<LongWritable, Text, Text    ,LongWritable>{
     58         Text k2 = new Text();
     59         LongWritable v2 = new LongWritable();
     60         //增加一个计数器,这个Map调用几次就输出对应的次数.
     61         int counter = 0;
     62         
     63         
     64         /**
     65          * key和value表示输入的信息
     66          * 每一行文本调用一次map函数
     67          */
     68         @Override
     69         protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)    
     70                 throws IOException, InterruptedException {
     71             counter  = counter + 1;
     72             System.out.println("mapper 调用的次数:" + counter);
     73             //这个map方法中的Mapper的各个泛型和上面的意识是一样的,分别代表的是k1,v1,k2,v2
     74             String line = value.toString();
     75             System.out.println(String.format("<k1,v1>的值<"+key.get()+","+line+">"));
     76             String[] splited = line.split("	");
     77             for (String word : splited) {
     78                 k2.set(word);
     79                 v2.set(1);
     80                 System.out.println(String.format("<k2,v2>的值<"+k2.toString()+","+v2.get()+">"));
     81                 context.write(k2, v2);//通过context对象写出去.
     82             }
     83         }
     84         /**
     85          * 这个地方的四个泛型的意思
     86          * 前两个泛型是对应的Map方法的后两个泛型.
     87          * Map的输出对应的是Reduce的输入.
     88          * 第一个Text是单词
     89          * 第二个LongWritable是单词对应的次数
     90          * 我们想输出的也是单词 和 次数
     91          * 所以第三个和第四个的类型和第一和第二个的一样
     92          * 
     93          * 分组指的是把相同key2的value2放到一个集合中
     94          *
     95          */
     96         public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
     97             LongWritable v3 = new LongWritable();
     98             //增加一个计数器,这个Reduce调用几次就输出对应的次数.
     99             int counter = 0;
    100             
    101             /**
    102              * 每一个分组调用一次reduce函数
    103              * 过来的k2 分别是hello you me
    104              * 
    105              */
    106             @Override
    107             protected void reduce(Text key2, Iterable<LongWritable> value2Iterable,Reducer<Text, LongWritable, Text, 
    108                     LongWritable>.Context context)
    109                             throws IOException, InterruptedException {
    110                 counter  = counter + 1;
    111                 System.out.println("reducer 调用的次数:" + counter);
    112                 //第一个参数是单词,第二个是可迭代的集合. 为什么上面的LongWritable类型的对象value2变成了一个可以迭代的结合参数?
    113                 //因为分组指的是把相同key2的value2放到一个集合中
    114                 long sum = 0L;
    115                 for (LongWritable value2 : value2Iterable) {
    116                     System.out.println(String.format("<k2,v2>的值<"+key2.toString()+","+value2.toString()+">"));
    117                     sum += value2.get(); //这个value2是LongWritable类型的,不能进行+= 操作,要用get()得到其对应的java基本类型.
    118                     //sum表示单词k2 在整个文本中的出现次数.
    119                 }
    120                 v3.set(sum);
    121                 context.write(key2, v3);
    122                 System.out.println(String.format("<k3,v3>的值<"+key2.toString()+","+v3.get()+">"));
    123             }
    124         }
    125     }
    126 }

    三:查看结果

    打包上传到Hadoop集群,然后执行命令运行.详细运行过程不再写了.........

    //==============================================================================================

    程序二:

      1 /*
      2  * 一个hello文件内容如下:
      3  *   hello        you
      4  *   hello        me
      5  */
      6 import java.io.IOException;
      7 
      8 import org.apache.hadoop.conf.Configuration;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 
     18 public class WordCountApp {
     19     public static void main(String[] args) throws Exception {
     20         // 在main方法写驱动程序,把Map函数和Reduce函数组织在一起.
     21         // 搞一个对象把Map对象和Reduce对象都放在这个对象中,我们把这个对象称作Job
     22         // 两个形参,一个是Configuration对象,一个是Job的名称,这样获得了一个Job对象;
     23         Job job = Job.getInstance(new Configuration(),
     24                 WordCountApp.class.getSimpleName());
     25         // 对这个job进行设置
     26         job.setJarByClass(WordCountApp.class);// 通过这个设置可以让框架识别你写的代码
     27         
     28         job.setMapperClass(MyMapper.class);// 把自定义的Map类放到job中
     29         job.setMapOutputKeyClass(Text.class);// 定义Map的key的输出类型,Map的输出是<hello,2>
     30         job.setMapOutputValueClass(LongWritable.class);// 定义Map的value的输出类型
     31 
     32         job.setReducerClass(MyReducer.class);// 把自定义的Reducer类放到job中
     33         job.setOutputKeyClass(Text.class);// 因为Reduce的输出是最终的数据,Reduce的输出是<hello,2>
     34         // 所以这个方法名中没有像Map对应的放发一样带有Reduce,直接就是setOutputKeyClass
     35         job.setOutputValueClass(LongWritable.class);// 定义reduce的value输出
     36 
     37         FileInputFormat.setInputPaths(job, args[0]);// 输入指定:传入一个job地址.
     38         // 这个args[0] 就是新地址,"hdfs://192.168.0.170/hello"
     39         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     40         // 输出指定
     41         // 指定输入和输出路径可以通过在这里写死的方式,也可以通过main函数参数的形式
     42         // 分别是args[0]和args[1]
     43 
     44         // 把job上传到yarn平台上.
     45         job.waitForCompletion(true);
     46     }
     47 
     48     /*
     49      * 对于<k1,v1>而言,每一行产生一个<k1,v1>对,<k1,v1>表示<行的起始位置,行的文本内容>
     50      * 就本例而言map函数总共调用两次,因为总共只有两行.
     51      * 正对要统计的文本内容可以知道总共两行,总共会调用两次Map函数对应产生的<k1,v1>分别是<0,hello you>
     52      * 和第二个<k1,v1>是<10,hello me>
     53      */
     54     private static class MyMapper extends
     55             Mapper<LongWritable, Text, Text, LongWritable> {
     56         // 这个Mapper的泛型参数是<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 分别对应的是k1,v1,k2,v2
     57         // 我们如下讲的k1,v1的类型是固定的.
     58         // 就本例而言,map函数会被调用2次,因为总共文本文件就只有两行.
     59         
     60         //要定义输出的k2和v2.本案例中可以分析出<k2,v2>是对文本内容的统计<hello,1><hello,1><you,1><me,1>
     61         //而且<k2,v2>的内容是和<k3,v3>中的内容是一样的.
     62         Text k2 = new Text();
     63         LongWritable v2 = new LongWritable();
     64         //重写父类Mapper中的map方法
     65         @Override
     66         protected void map(LongWritable key, Text value,
     67                 Mapper<LongWritable, Text, Text, LongWritable>.Context context)
     68                 throws IOException, InterruptedException {
     69             //通过代码或者案例分析就可以知道k1其实没有什么用出的.
     70             String line = value.toString();
     71             String[] splited = line.split("	");//根据制表分隔符机进行拆分.hello和me,you之间是一个制表分隔符.
     72             for (String word : splited) {
     73                 k2.set(word);
     74                 v2.set(1);
     75                 context.write(k2, v2);
     76                 //用context把k2,v2写出去,框架会写,不用我们去管.
     77             }
     78         }
     79     }
     80 
     81     private static class MyReducer extends
     82             Reducer<Text, LongWritable, Text, LongWritable> {
     83         //这个例子中的<k2,v2>和<k3,v3>中的k是一样的,所以这里,k2当做k3了.
     84         LongWritable v3 = new LongWritable();
     85         @Override
     86         protected void reduce(Text k2, Iterable<LongWritable> v2s,
     87                 Reducer<Text, LongWritable, Text, LongWritable>.Context context)
     88                 throws IOException, InterruptedException {
     89             //Reduce是对上面Map中的结果进行汇总的.
     90             //上面拆分出来的<k2,v2>是<hello,1><hello,1><you,1><me,1>Reduce方法中就要对其进行汇总.
     91             long sum = 0L;
     92             for(LongWritable v2:v2s){
     93                 sum = sum +v2.get();//sum是long类型,v2是LongWritable类型
     94                 //LongWritable类型转换成long类型用get()方法.
     95                 //sum的值表示单词在整个文件中出现的中次数.
     96             }
     97             v3.set(sum);
     98             context.write(k2,v3);
     99         }
    100     }
    101 }
  • 相关阅读:
    HDU5032 Always Cook Mushroom(树状数组&&离线)
    vue proxyTable
    vue-bus 组件通信插件
    gulp 静态资源版本控制
    js运算【按位非】~
    JS 的引用赋值与传值赋值
    手机端取消长按选中
    无刷新URL 更新
    移动端设计稿尺寸(微信端)
    4105: [Thu Summer Camp 2015]平方运算
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5494866.html
Copyright © 2011-2022 走看看