zoukankan      html  css  js  c++  java
  • 大数据笔记(八)——Mapreduce的高级特性(A)

    一.序列化

    类似于Java的序列化:将对象——>文件

    如果一个类实现了Serializable接口,这个类的对象就可以输出为文件

    同理,如果一个类实现了的Hadoop的序列化机制(接口:Writable),这个类的对象就可以作为输入和输出的值

    例子:使用序列化  求每个部门的工资总额

    数据:在map阶段输出k2部门号 v2是Employee对象

               reduce阶段:k4部门号 v3.getSal()得到薪水求和——>v4

    Employee.java:封装的员工属性

    package saltotal;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    //定义员工的属性: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    public class Employee implements Writable{
    
        private int empno;//员工号
        private String ename;//员工姓名
        private String job;//ְ职位
        private int mgr;//经理的员工号
        private String hiredate;//入职日期
        private int sal;//月薪
        private int comm;//奖金
        private int deptno;// 部门号
        
        @Override
        public String toString() {
            return "["+this.empno+"	"+this.ename+"	"+this.sal+"	"+this.deptno+"]";
        }
    
        @Override
        public void write(DataOutput output) throws IOException {
            // 代表序列化过程:输出
            output.writeInt(this.empno);
            output.writeUTF(this.ename);
            output.writeUTF(this.job);
            output.writeInt(this.mgr);
            output.writeUTF(this.hiredate);
            output.writeInt(this.sal);
            output.writeInt(this.comm);
            output.writeInt(this.deptno);
        }
        
        @Override
        public void readFields(DataInput input) throws IOException {
            // 代表反序列化:输入
            //注意:序列化和反序列化的顺序要一致
            this.empno = input.readInt();
            this.ename = input.readUTF();
            this.job = input.readUTF();
            this.mgr = input.readInt();
            this.hiredate = input.readUTF();
            this.sal = input.readInt();
            this.comm = input.readInt();
            this.deptno = input.readInt();
        }
        
        
        public int getEmpno() {
            return empno;
        }
        public void setEmpno(int empno) {
            this.empno = empno;
        }
        public String getEname() {
            return ename;
        }
        public void setEname(String ename) {
            this.ename = ename;
        }
        public String getJob() {
            return job;
        }
        public void setJob(String job) {
            this.job = job;
        }
        public int getMgr() {
            return mgr;
        }
        public void setMgr(int mgr) {
            this.mgr = mgr;
        }
        public String getHiredate() {
            return hiredate;
        }
        public void setHiredate(String hiredate) {
            this.hiredate = hiredate;
        }
        public int getSal() {
            return sal;
        }
        public void setSal(int sal) {
            this.sal = sal;
        }
        public int getComm() {
            return comm;
        }
        public void setComm(int comm) {
            this.comm = comm;
        }
        public int getDeptno() {
            return deptno;
        }
        public void setDeptno(int deptno) {
            this.deptno = deptno;
        }
    
        
    }

    EmployeeMapper.java

    package saltotal;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import saltotal.Employee;
    //k2 部门号 v2 员工对象
    public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee>{
    
        @Override
        protected void map(LongWritable k1, Text v1, Context context)
                throws IOException, InterruptedException {
            // 数据:MARTIN,SALEsMAN,7698,1981/9/28,1250,1400,30
            String data = v1.toString();
            
            //分词
            String[] words = data.split(",");
            
            //创建员工的对象
            Employee e = new Employee();
            
            //设置员工号
            e.setEmpno(Integer.parseInt(words[0]));
            //姓名
            e.setEname(words[1]);
            
            //职位
            e.setJob(words[2]);
            
            //经理号:有些没有
            try{
                e.setMgr(Integer.parseInt(words[3]));
            }catch(Exception ex){
                //空值设0
                e.setMgr(0);
            }
            
            //入职日期
            e.setHiredate(words[4]);
            
            //月薪
            e.setSal(Integer.parseInt(words[5]));
            
            //奖金:有的没有
            try{
                e.setComm(Integer.parseInt(words[6]));
            }catch(Exception ex){
                e.setComm(0);
            }
            
            //部门
            e.setDeptno(Integer.parseInt(words[7]));
            
            
            //输出 部门号 员工对象
            context.write(new IntWritable(e.getDeptno()), e);
        }
        
    }

    SalaryTotalReducer.java

    package saltotal;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    import saltotal.Employee;
    //                                              k3 部门号  v3员工对象    k4部门号 v4 工资总额   
    public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable>{
    
        @Override
        protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
                throws IOException, InterruptedException {
            //对v3求和
            int total = 0;
            for (Employee e : v3) {
                total = total + e.getSal();
            }
            
            //输出
            context.write(k3, new IntWritable(total));
        }
        
    }

    SalaryTotalMain.java

    package saltotal;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SalaryTotalMain {
        public static void main(String[] args) throws Exception {
            //创建一个job = map + reduce
            Job job = Job.getInstance(new Configuration());
            //ָ指定任务的入口
            job.setJarByClass(SalaryTotalMain.class);
            
            //ָ指定任务的Mapper和输出的数据类型k2 v2
            job.setMapperClass(SalaryTotalMapper.class);
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Employee.class);
            
            //ָ指定任务的Reducer和输出的数据类型k4 v4
            job.setReducerClass(SalaryTotalReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            
            //ָ指定输入输出的路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //执行任务
            job.waitForCompletion(true);
        }
    }

    输出jar文件,传到Linux上temp文件夹下,然后执行任务:

    hadoop jar temp/s3.jar /scott/emp.csv /output/day0301/s3

    二.排序

    1.数字的排序

      默认:按照key2进行升序排序

    现在HDFS上有一个文件,里面的数据如下:

     开发MapReduce程序进行排序:

    NumberMapper.java

    package mr.number;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class NumberMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
    
        @Override
        protected void map(LongWritable key1, Text value1, Context context)
                throws IOException, InterruptedException {
            //数字:10
            String data = value1.toString().trim();
            
            //输出:把数字作为k2
            context.write(new LongWritable(Long.parseLong(data)), NullWritable.get());
        }
    }

    NumberMain.java

    package mr.number;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class NumberMain {
    
        public static void main(String[] args) throws Exception {
            // 创建一个job = map + reduce
            Job job = Job.getInstance(new Configuration());
            //ָ指定任务入口
            job.setJarByClass(NumberMain.class);
            
            //ָ指定mapper和输出的数据类型:k2 v2
            job.setMapperClass(NumberMapper.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(NullWritable.class);
            
            //job.setSortComparatorClass(MyNumberComparator.class);
            
            //ָ指定输入和输出的路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //执行任务
            job.waitForCompletion(true);
        }
    
    }

    执行任务后看到结果:

    如果要改变默认的排序规则,需要创建一个自己的比较器

     定义一个降序比较器类 MyNumberComparator.java

    package mr.number;
    
    import org.apache.hadoop.io.LongWritable;
    
    //自己定义的比较器
    public class MyNumberComparator extends LongWritable.Comparator{
    
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            // 使用降序排序
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

    将NumberMain.java的这句话放开:

    job.setSortComparatorClass(MyNumberComparator.class);

    然后重新打包执行任务之后可看到如下结果:



















  • 相关阅读:
    PostgreSQL的MVCC(4)--Snapshots
    PostgreSQL的MVCC(3)--Row Versions
    PostgreSQL的MVCC(2)--Forks, files, pages
    asynchelper在一个同步方法(非async)方法中等待async方法
    restTemplate工具类【我】
    String字符串类型转数字进行计算及其他校验
    高并发系统设计(七):【Mysql数据库的优化】主从读写分离、分库分表
    高并发系统设计(六):如何减少频繁创建数据库连接的性能损耗?
    高并发系统设计(五):【系统设计目标③】如何让系统易于扩展?
    高并发系统设计(四):【系统设计目标②】系统怎样做到高可用?
  • 原文地址:https://www.cnblogs.com/lingluo2017/p/8490825.html
Copyright © 2011-2022 走看看