zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)

      不多说,直接上代码。

     

    代码

      1 package zhouls.bigdata.myMapReduce.ScoreCount;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import org.apache.hadoop.io.WritableComparable;
      7 /**
      8 * 学习成绩读写类
      9 * 数据格式参考:19020090017 小讲 90 99 100 89 95
     10 * @author Bertron
     11 * 需要自定义一个 ScoreWritable 类实现 WritableComparable 接口,将学生各门成绩封装起来。
     12 */
     13 public class ScoreWritable implements WritableComparable< Object > {//其实这里,跟TVPlayData一样的
     14 //  注意:    Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
     15 //         Writable接口提供两个方法(write和readFields)。
     16 
     17     
     18     private float Chinese;
     19     private float Math;
     20     private float English;
     21     private float Physics;
     22     private float Chemistry;
     23     
     24     
     25 //    问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
     26 //    答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。
     27     
     28     public ScoreWritable(){}//java里的无参构造函数,是用来在创建对象时初始化对象  
     29     //在hadoop的每个自定义类型代码里,好比,现在的ScoreWritable,都必须要写无参构造函数。
     30     
     31     
     32     //问:为什么我们在编程的时候,需要创建一个带有参的构造方法?
     33     //答:就是能让赋值更灵活。构造一般就是初始化数值,你不想别人用你这个类的时候每次实例化都能用另一个构造动态初始化一些信息么(当然没有需要额外赋值就用默认的)。
     34     
     35     public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry){//java里的有参构造函数,是用来在创建对象时初始化对象  
     36         this.Chinese = Chinese;
     37         this.Math = Math;
     38         this.English = English;
     39         this.Physics = Physics;
     40         this.Chemistry = Chemistry;
     41     }
     42     
     43     //问:其实set和get方法,这两个方法只是类中的setxxx和getxxx方法的总称,
     44     //    那么,为什么在编程时,有set和set***两个,只有get***一个呢?
     45     
     46     public void set(float Chinese,float Math,float English,float Physics,float Chemistry){
     47         this.Chinese = Chinese;//即float Chinese赋值给private float Chinese;
     48         this.Math = Math;
     49         this.English = English;
     50         this.Physics = Physics;
     51         this.Chemistry = Chemistry;
     52     }
     53 //    public float get(float Chinese,float Math,float English,float Physics,float Chemistry){因为这是错误的,所以对于set可以分开,get只能是get***
     54 //        return Chinese;
     55 //        return Math;
     56 //        return English;
     57 //        return Physics;
     58 //        return Chemistry;
     59 //    }
     60     
     61     
     62     public float getChinese() {//拿值,得返回,所以需有返回类型float
     63         return Chinese;
     64     }
     65     public void setChinese(float Chinese){//设值,不需,所以空返回类型
     66         this.Chinese = Chinese;
     67     }
     68     public float getMath() {//拿值
     69         return Math;
     70     }
     71     public void setMath(float Math){//设值
     72         this.Math = Math;
     73     }
     74     public float getEnglish() {//拿值
     75         return English;
     76     }
     77     public void setEnglish(float English){//设值
     78         this.English = English;
     79     }
     80     public float getPhysics() {//拿值
     81         return Physics;
     82     }
     83     public void setPhysics(float Physics){//设值
     84         this.Physics = Physics;
     85     }
     86     public float getChemistry() {//拿值
     87         return Chemistry;
     88     }
     89     public void setChemistry(float Chemistry) {//拿值
     90         this.Chemistry = Chemistry;
     91     }
     92     
     93     // 实现WritableComparable的readFields()方法
     94 //    对象不能传输的,需要转化成字节流!
     95 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
     96 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
     97     public void readFields(DataInput in) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
     98         Chinese = in.readFloat();//因为,我们这里的对象是float类型,所以是readFloat()
     99         Math = in.readFloat();
    100         English = in.readFloat();//注意:反序列化里,需要生成对象对吧,所以,是用到的是get那边对象
    101         Physics = in.readFloat();
    102         Chemistry = in.readFloat();
    103 //        in.readByte()
    104 //        in.readChar()
    105 //        in.readDouble()
    106 //        in.readLine() 
    107 //        in.readFloat()
    108 //        in.readLong()
    109 //        in.readShort()
    110     }
    111     
    112     // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出 
    113 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
    114 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
    115     public void write(DataOutput out) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
    116         out.writeFloat(Chinese);//因为,我们这里的对象是float类型,所以是writeFloat()
    117         out.writeFloat(Math);
    118         out.writeFloat(English);//注意:序列化里,需要对象对吧,所以,用到的是set那边的对象
    119         out.writeFloat(Physics);
    120         out.writeFloat(Chemistry);
    121 //        out.writeByte()
    122 //        out.writeChar()
    123 //        out.writeDouble()
    124 //        out.writeFloat()
    125 //        out.writeLong()
    126 //        out.writeShort()
    127 //        out.writeUTF()
    128     }
    129     
    130     public int compareTo(Object o) {//java里的比较,Java String.compareTo()
    131         return 0;
    132     }
    133     
    134     
    135 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
    136 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
    137 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
    138     
    139     
    140 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
    141 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
    142 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
    143     
    144     
    145 //  源码是
    146 //    package java.lang;  
    147 //    import java.util.*;      
    148 //    public interface Comparable {  
    149 //        /** 
    150 //        * 将this对象和对象o进行比较,约定:返回负数为小于,零为大于,整数为大于 
    151 //        */  
    152 //        public int compareTo(T o);  
    153 //    }
    154     
    155 }
      1 package zhouls.bigdata.myMapReduce.ScoreCount;
      2 
      3 import java.io.IOException;
      4 import org.apache.hadoop.conf.Configuration;
      5 import org.apache.hadoop.fs.FSDataInputStream;
      6 import org.apache.hadoop.fs.FileSystem;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.InputSplit;
     10 import org.apache.hadoop.mapreduce.JobContext;
     11 import org.apache.hadoop.mapreduce.RecordReader;
     12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     15 import org.apache.hadoop.util.LineReader;
     16 /**
     17 * 自定义学生成绩读写InputFormat
     18 * 数据格式参考:19020090017 小讲 90 99 100 89 95
     19 * @author Bertron
     20 */
     21 
     22             //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
     23             //比如   ScoreInputFormat  extends FileInputFormat implements InputFormat。
     24 
     25             //问:自定义输入格式 ScoreInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
     26 
     27 public class ScoreInputFormat extends FileInputFormat<Text,ScoreWritable > {//自定义数据输入格式,其实这都是模仿源码的!可以去看
     28 
     29 //    线路是: boolean  isSplitable()   ->   RecordReader<Text,ScoreWritable> createRecordReader()   ->   ScoreRecordReader extends RecordReader<Text, ScoreWritable > 
     30     
     31     @Override
     32     protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
     33             //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
     34 //        如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
     35 //        如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
     36         return false;    //整个文件封装到一个InputSplit
     37         //要么就是return true;        //切分64MB大小的一块一块,再封装到InputSplit
     38     }
     39     
     40     @Override
     41     public RecordReader<Text,ScoreWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
     42 //        RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
     43 //        createRecordReader是方法,在这里是,ScoreInputFormat.createRecordReader。ScoreInputFormat是InputFormat类的实例
     44 //        InputSplit input和TaskAttemptContext context是传入参数
     45         
     46 //        isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
     47 //        isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
     48         
     49         //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类ScoreRecordReader。
     50         //类似与Excel、WeiBo、TVPlayData代码写法
     51         return new ScoreRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
     52     }
     53     
     54     
     55     //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩
     56     public static class ScoreRecordReader extends RecordReader<Text, ScoreWritable > {//RecordReader<k1, v1>是一个整体
     57         public LineReader in;//行读取器
     58         public Text line;//每行数据类型
     59         public Text lineKey;//自定义key类型,即k1
     60         public ScoreWritable lineValue;//自定义value类型,即v1
     61         
     62         @Override
     63         public void close() throws IOException {//关闭输入流
     64             if(in !=null){
     65                 in.close();
     66             }
     67         }
     68         @Override
     69         public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
     70             return lineKey;//返回类型是Text,即Text lineKey
     71         }
     72         @Override
     73         public ScoreWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
     74             return lineValue;//返回类型是ScoreWritable,即ScoreWritable lineValue
     75         }
     76         @Override
     77         public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
     78             return 0;//返回类型是float,即float 0
     79         }
     80         @Override
     81         public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
     82             FileSplit split=(FileSplit)input;
     83             Configuration job=context.getConfiguration();
     84             Path file=split.getPath();
     85             FileSystem fs=file.getFileSystem(job);
     86             
     87             FSDataInputStream filein=fs.open(file);
     88             in=new LineReader(filein,job);//输入流in
     89             line=new Text();//每行数据类型
     90             lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
     91             lineValue = new ScoreWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
     92         }
     93         
     94         //此方法读取每行数据,完成自定义的key和value
     95         @Override
     96         public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
     97             int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
     98             
     99 //            是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader
    100             
    101 //            in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
    102 //            in.readLine(str, maxLineLength)//只读到maxLineLength行
    103 //            in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
    104 
    105             if(linesize==0) return false;
    106             
    107             
    108             String[] pieces = line.toString().split("\s+");//解析每行数据
    109                     //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
    110             
    111             if(pieces.length != 7){
    112                 throw new IOException("Invalid record received");
    113             }
    114             //将学生的每门成绩转换为 float 类型
    115             float a,b,c,d,e;
    116             try{
    117                 a = Float.parseFloat(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
    118                 b = Float.parseFloat(pieces[3].trim());
    119                 c = Float.parseFloat(pieces[4].trim());
    120                 d = Float.parseFloat(pieces[5].trim());
    121                 e = Float.parseFloat(pieces[6].trim());
    122             }catch(NumberFormatException nfe){
    123                 throw new IOException("Error parsing floating poing value in record");
    124             }
    125             lineKey.set(pieces[0]+"	"+pieces[1]);//完成自定义key数据
    126             lineValue.set(a, b, c, d, e);//封装自定义value数据
    127 //            或者写
    128 //            lineValue.set(Float.parseFloat(pieces[2].trim()),Float.parseFloat(pieces[3].trim()),Float.parseFloat(pieces[4].trim()),
    129 //                    Float.parseFloat(pieces[5].trim()),Float.parseFloat(pieces[6].trim()));
    130             
    131 //            pieces[0]   pieces[1] pieces[2]  ... pieces[6]
    132 //            19020090040 秦心芯 123 131 100 95 100
    133 //            19020090006 李磊 99 92 100 90 100
    134 //            19020090017 唐一建 90 99 100 89 95
    135 //            19020090031 曾丽丽 100 99 97 79 96
    136 //            19020090013 罗开俊 105 115 94 45 100
    137 //            19020090039 周世海 114 116 93 31 97
    138 //            19020090020 王正伟 109 98 88 47 99
    139 //            19020090025 谢瑞彬 94 120 100 50 73
    140 //            19020090007 于微 89 78 100 66 99
    141 //            19020090012 刘小利 87 82 89 71 99
    142             
    143             
    144             
    145             return true;
    146         }        
    147     }
    148 }
     1 package zhouls.bigdata.myMapReduce.ScoreCount;
     2 
     3 
     4 import java.io.IOException;
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.conf.Configured;
     7 import org.apache.hadoop.fs.FileSystem;
     8 import org.apache.hadoop.fs.Path;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15 import org.apache.hadoop.util.Tool;
    16 import org.apache.hadoop.util.ToolRunner;
    17 /**
    18 * 学生成绩统计Hadoop程序
    19 * 数据格式参考:19020090017 小讲 90 99 100 89 95
    20 * @author HuangBQ
    21 */
    22 public class ScoreCount extends Configured implements Tool{
    23     public static class ScoreMapper extends Mapper<Text,ScoreWritable,Text,ScoreWritable>{
    24         @Override
    25         protected void map(Text key, ScoreWritable value, Context context)throws IOException, InterruptedException{
    26             context.write(key, value);//写入key是k2,value是v2
    27 //            context.write(new Text(key), new ScoreWritable(value));等价           
    28         }
    29     }
    30     
    31     public static class ScoreReducer extends Reducer<Text,ScoreWritable,Text,Text>{
    32         private Text text = new Text();
    33         protected void reduce(Text Key, Iterable< ScoreWritable > Values, Context context)throws IOException, InterruptedException{
    34             float totalScore=0.0f;
    35             float averageScore = 0.0f;
    36             for(ScoreWritable ss:Values){
    37                 totalScore +=ss.getChinese()+ss.getMath()+ss.getEnglish()+ss.getPhysics()+ss.getChemistry();
    38                 averageScore +=totalScore/5;
    39             }
    40             text.set(totalScore+"	"+averageScore);
    41             context.write(Key, text);//写入Key是k3,text是v3
    42 //            context.write(new Text(Key),new Text(text));等价            
    43         }
    44     }
    45     
    46 
    47     public int run(String[] args) throws Exception{
    48         Configuration conf = new Configuration();//读取配置文件
    49         
    50         Path mypath = new Path(args[1]);
    51         FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
    52         if (hdfs.isDirectory(mypath)) 
    53         {
    54             hdfs.delete(mypath, true);
    55         }
    56         
    57         Job job = new Job(conf, "ScoreCount");//新建任务
    58         job.setJarByClass(ScoreCount.class);//设置主类
    59         
    60         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
    61         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
    62         
    63         job.setMapperClass(ScoreMapper.class);// Mapper
    64         job.setReducerClass(ScoreReducer.class);// Reducer
    65         
    66         job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
    67         job.setMapOutputValueClass(ScoreWritable.class);// Mapper value输出类型
    68                 
    69         job.setInputFormatClass(ScoreInputFormat.class);//设置自定义输入格式
    70         
    71         job.waitForCompletion(true);        
    72         return 0;
    73     }
    74     
    75     
    76     
    77     public static void main(String[] args) throws Exception{
    78 //        String[] args0 = 
    79 //                { 
    80 //                "hdfs://HadoopMaster:9000/score/score.txt",
    81 //                "hdfs://HadoopMaster:9000/out/score/" 
    82 //                };
    83         
    84         String[] args0 = 
    85             { 
    86             "./data/score/score.txt",
    87             "./out/score/" 
    88             };
    89         
    90         int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);
    91         System.exit(ec);
    92     }
    93 }
  • 相关阅读:
    160309_Qt Essentials
    160309_Qt Reference Documentation
    160308_Signals & Slots
    160308_Helloworld_Gui Application
    网络爬虫(14)-动态页面爬取
    数据分析(6)-Pandas日期数据处理
    mysql基础(2)-excel功能在excel中如何实现?
    数据分析(5)-数据可视化常用图表类型和使用场景
    财经数据(6)-Python多进程爬虫东方财富个股盘口异动数据
    财经数据(5)-开盘啦股票标签数据爬虫
  • 原文地址:https://www.cnblogs.com/zlslch/p/6165667.html
Copyright © 2011-2022 走看看