zoukankan      html  css  js  c++  java
  • hadoop学习第四天-Writable和WritableComparable序列化接口的使用&&MapReduce中传递javaBean的简单例子

    一、 为什么javaBean要继承Writable和WritableComparable接口?

    1. 如果一个javaBean想要作为MapReduce的key或者value,就一定要实现序列化,因为在Map到Reduce阶段的时候,只能是传输二进制数据,不可能将字符流直接进行RPC传输,

    只要一个javabean实现了序列化和反序列化,就可以做为key或者value

    最简单的序列化和反序列化就是实现Writable接口

    ps:javaBean在作为key的时候有点不同,除了要继承Writable接口还需要实现Comparable接口

    因为在shuffle到Reduce阶段的合并阶段,需要根据key对数据进行排序,合并,如果不实现这个接口,运行时会出错

    WritableComparable就是Writable接口和java.lang.Comparable<T>的一个子接口,所以将要作为key的javaBean直接继承WritableComparable就可以了

    2. java序列化与Writable序列化的比较

    2.1 java序列化不够灵活,为了更好的控制序列化的整个流程所以使用Writable

    2.2 java序列化不符合序列化的标准,没有做一定的压缩,java序列化首先写类名,然后再是整个类的数据,而且成员对象在序列化中只存引用,成员对象的可以出现的位置很随机,既可以在序列化的对象前,也可以在其后面,这样就对随机访问造成影响,一旦出错,整个后面的序列化就会全部错误

    2.3 Java序列化每次序列化都要重新创建对象,内存消耗大,而Writable是可以重用的

    二、 实现Writable和WritableComparable的UserBean

    代码如下:

    package com.qjx.serialize_8_2;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class UserBean implements WritableComparable<UserBean> {
        
        private int id;
        private String name ;
        private String age;
        
        public UserBean() {
        }
        
        public UserBean(int id,String name , String age) {
            this.id = id;
            this.name = name;
            this.age = age;
        }
        
        @Override
        public String toString() {
            return this.id + this.name + this.age;
        }
    
        //反序列化,将输入二进制反序列化为字符流
        @Override
        public void readFields(DataInput in) throws IOException {
            id = in.readInt();
            name = in.readUTF();
            age = in.readUTF();
        }
        
        //序列化,将字节转化为二进制输出
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(id);
            out.writeUTF(name);
            out.writeUTF(age);
        }
    
        @Override
        public int compareTo(UserBean o) {
             int thisValue = this.id;
             int thatValue = o.id;
             return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getAge() {
            return age;
        }
    
        public void setAge(String age) {
            this.age = age;
        }
    }

    三、 MapReduce传递UserBean的一个简单例子

    我们已经实现了可序列化的UserBean类,现在就做一个简单的例子,在MapReduce中传递UserBean

    1. 准备一个文件user.txt,内容如下:

    1 'tom' '22',2 'tom2' '22',3 'tom3' '22',4 'tom4' '22',5 'tom5' '22',6 'tom6' '22',7 'tom7' '22',8 'tom8' '22',9 'tom9' '22',10 'tom10' '22',11 'tom11' '22',12 'tom12' '22',13 'tom13' '22',1 'tom' '22',1 'tom' '22',2 'tom2' '22',2 'tom2' '22',
    

    这个文件中有多个UserBean,我们的MapReduce就是要实现统计这些UserBean出现的次数

    2. WCMapper.java的实现代码:

    package com.qjx.serialize_8_2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /*  
     * Writable接口是一个实现了序列化协议的序列化对象。 
     * 在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。 
     * LongWritable类型:Hadoop.io对Long类型的封装类型 
     */  
    
    public class WCMapper extends Mapper<LongWritable, Text, UserBean, LongWritable>{
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, UserBean, LongWritable>.Context context)
    			throws IOException, InterruptedException {
    		
            // 获得每行文档内容,并且进行折分  
            String[] users = value.toString().split(",");
            
            // 遍历折份的内容  
            System.out.println(users.length);
            for (String u1 : users) {
         		   //根据空格划分为三个属性
         		String[] u = u1.toString().split(" ");
         		System.out.println(u.length);
         		if(u!=null && u.length== 3) {
    	     		UserBean u2 = new UserBean(Integer.parseInt(u[0]),u[1],u[2]);
    		     	context.write(u2, new LongWritable(1));
    	     	}
    	     	else {
    	     		System.out.println("user split false !");
    	     	}
    		}
    	}
    }
    

    3. WCReducer.java实现代码:

    package com.qjx.serialize_8_2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import com.qjx.serialize_8_2.UserBean;
    
    public class WCReducer extends Reducer<UserBean, LongWritable, UserBean, LongWritable>{
    
        @Override
        protected void reduce(UserBean key, Iterable<LongWritable> values,
                Reducer<UserBean, LongWritable, UserBean, LongWritable>.Context context) throws IOException, InterruptedException {
            
            long sum = 0;      
                for (LongWritable i : values) {
                    // i.get转换成long类型  
                    sum += i.get();  
                }
                // 输出总计结果  
                context.write(key, new LongWritable(sum));  
        }
    }

    4. UserCount.java 的实现代码:

    package com.qjx.serialize_8_2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class UserCount {
    
        public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
              // 创建job对象  
            Job job = Job.getInstance(new Configuration());  
            // 指定程序的入口  
            job.setJarByClass(UserCount.class);  
      
            // 指定自定义的Mapper阶段的任务处理类  
            job.setMapperClass(WCMapper.class);  
            job.setMapOutputKeyClass(UserBean.class);  
            job.setMapOutputValueClass(LongWritable.class);
            // 本地数据的输入路径
            FileInputFormat.setInputPaths(job, new Path("E:/trainingPack/serialize/input"));  
      
            // 指定自定义的Reducer阶段的任务处理类  
            job.setReducerClass(WCReducer.class);  
            // 设置最后输出结果的Key和Value的类型  x    
            job.setOutputKeyClass(UserBean.class);  
            job.setOutputValueClass(LongWritable.class);  
            // 将计算的结果存到本地
            FileOutputFormat.setOutputPath(job, new Path("E:/trainingPack/serialize/output"));  
      
            // 执行提交job方法,直到完成,参数true打印进度和详情  
            job.waitForCompletion(true);  
            System.out.println("Finished");  
        }
    }

    5. 执行结果,生成的output内容如下:

    1'tom''22'    3
    2'tom2''22'    3
    3'tom3''22'    1
    4'tom4''22'    1
    5'tom5''22'    1
    6'tom6''22'    1
    7'tom7''22'    1
    8'tom8''22'    1
    9'tom9''22'    1
    10'tom10''22'    1
    11'tom11''22'    1
    12'tom12''22'    1
    13'tom13''22'    1
  • 相关阅读:
    前端调用后台接口下载word文档的两种方法
    Layui form表单提交注意事项
    JavaScript_Util_01
    心理
    小例子
    SQL分割字符串
    绘制10种不同颜色的散点图
    绘制散点图
    subplot的使用
    绘制正弦余弦
  • 原文地址:https://www.cnblogs.com/qjx-2016/p/7286118.html
Copyright © 2011-2022 走看看