zoukankan      html  css  js  c++  java
  • Mapreduce自定义数据类型

    Hadoop自带的数据类型

    Intwritable,LongWritable,Text,xxWritable.

     

    某些情况下:使用自定义的数据类型方便一些(类似java中的pojo)。

    实现:

    实现writableComparable接口即可。

    场景例如:

    成绩表:由语文,数学,英文组成。

    上传到hdfs上score目录下一个score.txt文件--文件内容如下:

    想让按照总成绩进行排名。如果成绩相同,则按照语文,数学,英文来排序。

    一、自定义ScoreWritable实现writableComparable接口:

    package com.day07;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.*;
    public class ScoreWritable implements WritableComparable<ScoreWritable> {
        int chinese;
        int math;
        int english;
        int sum;
    
        public ScoreWritable() {
        }
    
        public ScoreWritable(int chinese, int math, int english) {
            this.chinese = chinese;
            this.math = math;
            this.english = english;
            this.sum=chinese+english+math;
        }
    
        @Override
        public String toString() {
            return "ScoreWritable{" +
                    "chinese=" + chinese +
                    ", math=" + math +
                    ", english=" + english +
                    ", sum=" + sum +
                    '}';
        }
    
        public int getChinese() {
            return chinese;
        }
    
        public void setChinese(int chinese) {
            this.chinese = chinese;
        }
    
        public int getMath() {
            return math;
        }
    
        public void setMath(int math) {
            this.math = math;
        }
    
        public int getEnglish() {
            return english;
        }
    
        public void setEnglish(int english) {
            this.english = english;
        }
    
        public int getSum() {
            return sum;
        }
    
        public void setSum(int sum) {
            this.sum = sum;
        }
        //比较
        public int compareTo(ScoreWritable that) {
            //先比较总成绩
            if (this.sum>that.getSum()){
                return -1;
            }else if(this.sum<that.getSum()){
                return 1;
            }else{
                if (this.chinese>that.getChinese()){
                    return -1;
                }else if (this.chinese<that.getChinese()){
                    return 1;
                }else {
                    return -(this.math-that.getMath());
                }
            }
        }
        //序列化--dataOutput(data流):可以自定义序列化对象,节省空间,hadoop用的就是这个流
        public void write(DataOutput out) throws IOException {
            out.writeInt(chinese);
            out.writeInt(math);
            out.writeInt(english);
            out.writeInt(sum);
        }
        //反序列化
        public void readFields(DataInput in) throws IOException {
            this.chinese = in.readInt();
            this.math = in.readInt();
            this.english = in.readInt();
            this.sum = in.readInt();
        }
    }
    

    注意:

    最好实现toString方法。

     

    二、编写ScoreJob类用于测试自定义的ScoreWritable

    package com.day07;
    
    import com.day03.MaxSaleJob;
    import com.google.common.io.Resources;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class ScoreJob {
        public static class ScoreMapper extends Mapper<LongWritable,Text,ScoreWritable,NullWritable>{
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //super.map(key, value, context);
                String[] grades = value.toString().split(",");
                ScoreWritable score = new ScoreWritable(Integer.parseInt(grades[0]), Integer.parseInt(grades[1]), Integer.parseInt(grades[2]));
                context.write(score,NullWritable.get());
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
    
    
            Configuration coreSiteConf = new Configuration();
    
            coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));
            //设置一个任务
            Job job = Job.getInstance(coreSiteConf, "score");
            //设置job的运行类
            job.setJarByClass(ScoreJob.class);
            //mrdemo/target/mrdemo-1.0-SNAPSHOT.jar
            //job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar");
            //设置Map和Reduce处理类
            job.setMapperClass(ScoreMapper.class);
            //map输出类型
            job.setMapOutputKeyClass(ScoreWritable.class);
            job.setMapOutputValueClass(NullWritable.class);
            //设置job/reduce输出类型
            /*job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);*/
    
            //设置任务的输入路径
            FileInputFormat.addInputPath(job, new Path("/score/"));
            FileSystem fileSystem = FileSystem.get(coreSiteConf);
            if(fileSystem.exists(new Path("/out/"))){
                fileSystem.delete(new Path("/out/"),true);
            };
    
            FileOutputFormat.setOutputPath(job, new Path("/out/"));
            //运行任务
            boolean flag = job.waitForCompletion(true);
    
            if(flag){
                FSDataInputStream open = fileSystem.open(new Path("/out/part-r-00000"));
                byte[] buffer = new byte[1024];
                IOUtils.readFully(open,buffer,0,open.available());
                System.out.println(new String(buffer));
            }
        }
    }
    

    三、测试结果,类似于一下内容

  • 相关阅读:
    10-多线程笔记-2-锁-3-Lock-4-工具类
    09-多线程笔记-2-锁-3-Lock-3-ReadWriteLock
    08-多线程笔记-2-锁-3-Lock-2-Lock
    07-多线程笔记-2-锁-3-Lock-1-AQS
    空闲时间无聊写的一个软著源代码文档生成器
    Centos7.x创建lvm
    cups API
    debezium 使用踩坑
    hive 行列转换
    mac 上docker 容器动态暴露端口
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305597.html
Copyright © 2011-2022 走看看