zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和排序多种输出格式(十一)

      推荐

    MapReduce分析明星微博数据

    http://git.oschina.net/ljc520313/codeexample/tree/master/bigdata/hadoop/mapreduce/05.%E6%98%8E%E6%98%9F%E5%BE%AE%E5%8D%9A%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90?dir=1&filepath=bigdata%2Fhadoop%2Fmapreduce%2F05.%E6%98%8E%E6%98%9F%E5%BE%AE%E5%8D%9A%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90&oid=854b4300ccc9fbae894f2f8c29df3ca06193f97b&sha=79a86bf0ff190e38a133bc2446b6b4ad9490f40f  

       

           自定义输入格式,将明星微博数据排序后按粉丝数 关注数 微博数 分别输出到不同文件中。

     

     代码

      1 package zhouls.bigdata.myMapReduce.ScoreCount;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import org.apache.hadoop.io.WritableComparable;
      7 /**
      8 * 学习成绩读写类
      9 * 数据格式参考:19020090017 小讲 90 99 100 89 95
     10 * @author Bertron
     11 * 需要自定义一个 ScoreWritable 类实现 WritableComparable 接口,将学生各门成绩封装起来。
     12 */
     13 public class ScoreWritable implements WritableComparable< Object > {//其实这里,跟TVPlayData一样的
     14 //  注意:    Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
     15 //         Writable接口提供两个方法(write和readFields)。
     16 
     17     
     18     private float Chinese;
     19     private float Math;
     20     private float English;
     21     private float Physics;
     22     private float Chemistry;
     23     
     24     
     25 //    问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
     26 //    答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。
     27     
     28     public ScoreWritable(){}//java里的无参构造函数,是用来在创建对象时初始化对象  
     29     //在hadoop的每个自定义类型代码里,好比,现在的ScoreWritable,都必须要写无参构造函数。
     30     
     31     
     32     //问:为什么我们在编程的时候,需要创建一个带有参的构造方法?
     33     //答:就是能让赋值更灵活。构造一般就是初始化数值,你不想别人用你这个类的时候每次实例化都能用另一个构造动态初始化一些信息么(当然没有需要额外赋值就用默认的)。
     34     
     35     public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry){//java里的有参构造函数,是用来在创建对象时初始化对象  
     36         this.Chinese = Chinese;
     37         this.Math = Math;
     38         this.English = English;
     39         this.Physics = Physics;
     40         this.Chemistry = Chemistry;
     41     }
     42     
     43     //问:其实set和get方法,这两个方法只是类中的setxxx和getxxx方法的总称,
     44     //    那么,为什么在编程时,有set和set***两个,只有get***一个呢?
     45     
     46     public void set(float Chinese,float Math,float English,float Physics,float Chemistry){
     47         this.Chinese = Chinese;//即float Chinese赋值给private float Chinese;
     48         this.Math = Math;
     49         this.English = English;
     50         this.Physics = Physics;
     51         this.Chemistry = Chemistry;
     52     }
     53 //    public float get(float Chinese,float Math,float English,float Physics,float Chemistry){因为这是错误的,所以对于set可以分开,get只能是get***
     54 //        return Chinese;
     55 //        return Math;
     56 //        return English;
     57 //        return Physics;
     58 //        return Chemistry;
     59 //    }
     60     
     61     
     62     public float getChinese() {//拿值,得返回,所以需有返回类型float
     63         return Chinese;
     64     }
     65     public void setChinese(float Chinese){//设值,不需,所以空返回类型
     66         this.Chinese = Chinese;
     67     }
     68     public float getMath() {//拿值
     69         return Math;
     70     }
     71     public void setMath(float Math){//设值
     72         this.Math = Math;
     73     }
     74     public float getEnglish() {//拿值
     75         return English;
     76     }
     77     public void setEnglish(float English){//设值
     78         this.English = English;
     79     }
     80     public float getPhysics() {//拿值
     81         return Physics;
     82     }
     83     public void setPhysics(float Physics){//设值
     84         this.Physics = Physics;
     85     }
     86     public float getChemistry() {//拿值
     87         return Chemistry;
     88     }
     89     public void setChemistry(float Chemistry) {//拿值
     90         this.Chemistry = Chemistry;
     91     }
     92     
     93     // 实现WritableComparable的readFields()方法
     94 //    对象不能传输的,需要转化成字节流!
     95 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
     96 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
     97     public void readFields(DataInput in) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
     98         Chinese = in.readFloat();//因为,我们这里的对象是float类型,所以是readFloat()
     99         Math = in.readFloat();
    100         English = in.readFloat();//注意:反序列化里,需要生成对象对吧,所以,是用到的是get那边对象
    101         Physics = in.readFloat();
    102         Chemistry = in.readFloat();
    103 //        in.readByte()
    104 //        in.readChar()
    105 //        in.readDouble()
    106 //        in.readLine() 
    107 //        in.readFloat()
    108 //        in.readLong()
    109 //        in.readShort()
    110     }
    111     
    112     // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出 
    113 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
    114 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
    115     public void write(DataOutput out) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
    116         out.writeFloat(Chinese);//因为,我们这里的对象是float类型,所以是writeFloat()
    117         out.writeFloat(Math);
    118         out.writeFloat(English);//注意:序列化里,需要对象对吧,所以,用到的是set那边的对象
    119         out.writeFloat(Physics);
    120         out.writeFloat(Chemistry);
    121 //        out.writeByte()
    122 //        out.writeChar()
    123 //        out.writeDouble()
    124 //        out.writeFloat()
    125 //        out.writeLong()
    126 //        out.writeShort()
    127 //        out.writeUTF()
    128     }
    129     
    130     public int compareTo(Object o) {//java里的比较,Java String.compareTo()
    131         return 0;
    132     }
    133     
    134     
    135 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
    136 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
    137 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
    138     
    139     
    140 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
    141 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
    142 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
    143     
    144     
    145 //  源码是
    146 //    package java.lang;  
    147 //    import java.util.*;      
    148 //    public interface Comparable {  
    149 //        /** 
    150 //        * 将this对象和对象o进行比较,约定:返回负数为小于,零为大于,整数为大于 
    151 //        */  
    152 //        public int compareTo(T o);  
    153 //    }
    154     
    155 }
      1 package zhouls.bigdata.myMapReduce.WeiboCount;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.FSDataInputStream;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.InputSplit;
     11 import org.apache.hadoop.mapreduce.JobContext;
     12 import org.apache.hadoop.mapreduce.RecordReader;
     13 import org.apache.hadoop.mapreduce.TaskAttemptContext;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     16 import org.apache.hadoop.util.LineReader;
     17  19 
     20 
     21 //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
     22 //比如   WeiboInputFormat  extends FileInputFormat implements InputFormat。
     23 
     24 //问:自定义输入格式 WeiboInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
     25 
     26 
     27 public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{
     28     
     29 //    线路是: boolean  isSplitable()   ->   RecordReader<Text,WeiBo> createRecordReader()   ->   WeiboRecordReader extends RecordReader<Text, WeiBo > 
     30 
     31     
     32     @Override
     33     protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
     34             //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
     35 //        如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
     36 //        如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
     37         return false;    //整个文件封装到一个InputSplit
     38         //要么就是return true;        //切分64MB大小的一块一块,再封装到InputSplit
     39     }
     40     
     41     
     42     
     43     
     44      @Override
     45      public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws IOException, InterruptedException{
     46 //         RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
     47 //         createRecordReader是方法,在这里是,WeiboInputFormat.createRecordReader。WeiboInputFormat是InputFormat类的实例
     48 //         InputSplit input和TaskAttemptContext context是传入参数
     49          
     50 //         isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
     51 //         isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
     52          
     53          //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类WeiboRecordReader。
     54          //类似与Excel、WeiBo、TVPlayData代码写法
     55           return new WeiboRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
     56      }
     57 
     58      
     59      
     60      public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
     61              //LineReader      in是1,行号。
     62              //Text line;      俞灏明    俞灏明    10591367    206    558,每行的相关记录
     63             public LineReader in;//行读取器
     64             public Text line;//每行数据类型
     65             public Text lineKey;//自定义key类型,即k1
     66             public WeiBo lineValue;//自定义value类型,即v1
     67             
     68             
     69              @Override
     70                 public void close() throws IOException {//关闭输入流
     71                     if(in !=null){
     72                         in.close();
     73                     }
     74                 }
     75                 @Override
     76                 public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
     77                     return lineKey;//返回类型是Text,即Text lineKey
     78                 }
     79                 @Override
     80                 public WeiBo getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
     81                     return lineValue;//返回类型是WeiBo,即WeiBo lineValue
     82                 }
     83                 @Override
     84                 public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
     85                     return 0;//返回类型是float,即float 0
     86                 }
     87                 
     88                 
     89             
     90             @Override
     91             public void initialize(InputSplit input, TaskAttemptContext context)throws IOException, InterruptedException{//初始化,都是模板
     92                 FileSplit split=(FileSplit)input;//获取split   
     93                 Configuration job=context.getConfiguration();  
     94                 Path file=split.getPath();//得到文件路径     
     95                 FileSystem fs=file.getFileSystem(job); 
     96                 
     97                 FSDataInputStream filein=fs.open(file);//打开文件   
     98                 in=new LineReader(filein,job); //输入流in
     99                 line=new Text();//每行数据类型
    100                 lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
    101                 lineValue = new WeiBo();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
    102             }
    103 
    104             
    105             //此方法读取每行数据,完成自定义的key和value
    106             @Override
    107             public boolean nextKeyValue() throws IOException, InterruptedException{//这里面,才是篡改的重点
    108                 int linesize=in.readLine(line); //line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
    109                 
    110 //              是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader
    111 
    112 //              in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
    113 //              in.readLine(str, maxLineLength)//只读到maxLineLength行
    114 //              in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
    115                 
    116                 
    117                 if(linesize==0)  return false; 
    118                 
    119                 //通过分隔符'	',将每行的数据解析成数组 pieces
    120                 String[] pieces = line.toString().split("	"); 
    121                     //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
    122                 
    123                 if(pieces.length != 5){  
    124                     throw new IOException("Invalid record received");  
    125                 } 
    126                 
    127                 int a,b,c;
    128                 
    129                 try{  
    130                     a = Integer.parseInt(pieces[2].trim());//粉丝,//将String类型,如pieces[2]转换成,float类型,给a  
    131                     b = Integer.parseInt(pieces[3].trim());//关注
    132                     c = Integer.parseInt(pieces[4].trim());//微博数
    133                 }catch(NumberFormatException nfe)
    134                 {  
    135                     throw new IOException("Error parsing floating poing value in record");  
    136                 } 
    137                 
    138                 
    139                 //自定义key和value值
    140                 lineKey.set(pieces[0]); //完成自定义key数据 
    141                 lineValue.set(b, a, c);//完成自定义value数据
    142 //              或者写
    143 //              lineValue.set(Integer.parseInt(pieces[2].trim()),Integer.parseInt(pieces[3].trim()),Integer.parseInt(pieces[4].trim()));
    144 
    145                 
    146 //                pieces[0]   pieces[1] pieces[2]  ... pieces[4]
    147 //                俞灏明    俞灏明    10591367    206    558
    148 //                李敏镐    李敏镐    22898071    11    268
    149 //                大自然保护协会-马云    大自然保护协会-马云    15616866    0    39
    150 //                林心如    林心如    57488649    214    5940
    151 //                时尚小编Anne    时尚小编Anne    10064227    136    2103
    152 //                黄晓明    黄晓明    22616497    506    2011
    153 //                张靓颖    张靓颖    27878708    238    3846
    154 //                张成龙2012    张成龙2012    9813621    199    744
    155 //                吳君如大美女    吳君如大美女    18490338    190    412
    156 //                李娜    李娜    23309493    81    631
    157 //                徐小平    徐小平    11659926    1929    13795
    158 //                唐嫣    唐嫣    24301532    200    2391
    159 //                有斐君    有斐君    8779383    577    4251
    160                 
    161                 
    162                 return true;
    163             }
    164             
    165             
    166             
    167         }
    168 }
      1 package zhouls.bigdata.myMapReduce.WeiboCount;
      2 
      3 import java.io.IOException;
      4 import java.util.Arrays;
      5 import java.util.Comparator;
      6 import java.util.HashMap;
      7 import java.util.Set;
      8 import java.util.Map;
      9 
     10 import org.apache.hadoop.conf.Configuration;
     11 import org.apache.hadoop.conf.Configured;
     12 import org.apache.hadoop.fs.FileSystem;
     13 import org.apache.hadoop.fs.Path;
     14 
     15 import org.apache.hadoop.io.IntWritable;
     16 import org.apache.hadoop.io.Text;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.Mapper;
     19 import org.apache.hadoop.mapreduce.Reducer;
     20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     22 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
     23 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     24 import org.apache.hadoop.util.Tool;
     25 import org.apache.hadoop.util.ToolRunner;
     26 
     27 public class WeiboCount extends Configured implements Tool{
     28     public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text>{
     29         @Override
     30         protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException{
     31             context.write(new Text("follower"),new Text(key.toString() + "	" + value.getFollowers()));
     32             context.write(new Text("friend"),new Text(key.toString() + "	" + value.getFriends()));
     33             context.write(new Text("statuses"),new Text(key.toString() + "	" + value.getStatuses()));
     34         }
     35     }
     36     
     37     public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
     38         private MultipleOutputs<Text, IntWritable> mos;
     39 
     40         protected void setup(Context context) throws IOException,InterruptedException{
     41             mos = new MultipleOutputs<Text, IntWritable>(context);
     42         }
     43 
     44         private Text text = new Text();
     45 
     46         protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException{
     47             int N = context.getConfiguration().getInt("reduceHasMaxLength", Integer.MAX_VALUE);
     48             Map<String,Integer> m = new HashMap<String,Integer>();
     49             for(Text value:Values){//星型for循环,意思是把Values的值传给Text value
     50                 //value=名称+(粉丝数 或 关注数 或 微博数)
     51                 String[] records = value.toString().split("	");
     52                 m.put(records[0],Integer.parseInt(records[1].toString()));
     53             }
     54             
     55             //对Map内的数据进行排序
     56             Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(m);
     57             for(int i = 0; i< N  &&  i< entries.length;i++){
     58                 if(Key.toString().equals("follower")){
     59                     mos.write("follower",entries[i].getKey(), entries[i].getValue());
     60                 }else if(Key.toString().equals("friend")){
     61                     mos.write("friend", entries[i].getKey(), entries[i].getValue());
     62                 }else if(Key.toString().equals("status")){
     63                     mos.write("statuses", entries[i].getKey(), entries[i].getValue()); 
     64                 }
     65             }               
     66         }
     67 
     68         protected void cleanup(Context context) throws IOException,InterruptedException {
     69             mos.close();
     70         }
     71     }
     72     
     73     
     74     public int run(String[] args) throws Exception{
     75         Configuration conf = new Configuration();// 配置文件对象
     76         Path mypath = new Path(args[1]);
     77         FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
     78         if (hdfs.isDirectory(mypath)){
     79             hdfs.delete(mypath, true);
     80         }
     81 
     82         Job job = new Job(conf, "weibo");// 构造任务
     83         job.setJarByClass(WeiboCount.class);// 主类
     84 
     85         job.setMapperClass(WeiBoMapper.class);// Mapper
     86         job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
     87         job.setMapOutputValueClass(Text.class);// Mapper value输出类型
     88         
     89         job.setReducerClass(WeiBoReducer.class);// Reducer
     90         job.setOutputKeyClass(Text.class);
     91         job.setOutputValueClass(IntWritable.class);
     92         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
     93         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
     94         job.setInputFormatClass(WeiboInputFormat.class);// 自定义输入格式
     95         //自定义文件输出类别
     96         MultipleOutputs.addNamedOutput(job, "follower", TextOutputFormat.class,Text.class, IntWritable.class);
     97         MultipleOutputs.addNamedOutput(job, "friend", TextOutputFormat.class,Text.class, IntWritable.class);
     98         MultipleOutputs.addNamedOutput(job, "status", TextOutputFormat.class,Text.class, IntWritable.class);
     99         job.waitForCompletion(true);
    100         return 0;
    101     }
    102     
    103     
    104     //对Map内的数据进行排序(只适合小数据量)
    105     public static Map.Entry[] getSortedHashtableByValue(Map h){  
    106         Set set = h.entrySet();  
    107         Map.Entry[] entries = (Map.Entry[]) set.toArray(new Map.Entry[set.size()]);  
    108         Arrays.sort(entries, new Comparator(){  
    109             public int compare(Object arg0, Object arg1){  
    110                 Long key1 = Long.valueOf(((Map.Entry) arg0).getValue().toString());  
    111                 Long key2 = Long.valueOf(((Map.Entry) arg1).getValue().toString());  
    112             return key2.compareTo(key1);  
    113         } });
    114         return entries;  
    115     }
    116     
    117     public static void main(String[] args) throws Exception{
    118 //        String[] args0 = { "hdfs://HadoopMaster:9000/weibo/weibo.txt",
    119 //                "hdfs://HadoopMaster:9000/out/weibo/" };
    120         
    121         String[] args0 = { "./data/weibo/weibo.txt",
    122         "./out/weibo/" };
    123         
    124         int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
    125         System.exit(ec);
    126     }
    127 }
    128         
  • 相关阅读:
    设计模式系列
    Python3 系列之 可变参数和关键字参数
    设计模式系列
    【HANA系列】SAP HANA ODBC error due to mismatch of version
    【FICO系列】SAP FICO FS00修改科目为未清项目管理
    【FIORI系列】SAP OpenUI5 (SAPUI5) js框架简单介绍
    【HANA系列】SAP HANA SQL获取当前日期加若干天后的日期
    【HANA系列】SAP HANA SQL获取本周的周一
    【HANA系列】SAP HANA SQL获取当前日期
    【HANA系列】SAP HANA SQL获取当前日期最后一天
  • 原文地址:https://www.cnblogs.com/zlslch/p/6164435.html
Copyright © 2011-2022 走看看