zoukankan      html  css  js  c++  java
  • Mapjoin和Reducejoin案例

    一、Mapjoin案例

      1.需求:有两个文件,分别是订单表、商品表,

      订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),

      商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),

      要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。

      2.解决思路:

      将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要Reducer,并在Driver执行类中设置setCacheFile和numReduceTask。

      3.代码如下:

    public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    	
    	HashMap<String, String> pdMap = new HashMap<>();
    	//1.商品表加载到内存
    	protected void setup(Context context) throws IOException {
    		
    		//加载缓存文件
    		BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8"));
    		
    		String line;
    		
    		while(StringUtils.isNotEmpty(line = br.readLine()) ) {
    			
    			//切分
    			String[] fields = line.split("	");
    			
    			//缓存
    			pdMap.put(fields[0], fields[1]);
    			
    		}
    		
    		br.close();
    	
    	}
    		
    		
    		
    	//2.map传输
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    			throws IOException, InterruptedException {
    		//获取数据
    		String line = value.toString();
    		
    		//切割
    		String[] fields = line.split("	");
    		
    		//获取订单中商品id
    		String pid = fields[1];
    		
    		//根据订单商品id获取商品名
    		String pName = pdMap.get(pid);
    		
    		//拼接数据
    		line = line + "	" + pName;
    		
    		//输出
    		context.write(new Text(line), NullWritable.get());
    	}
    }
    
    public class CacheDriver {
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
    		// 1.获取job信息
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    
    		// 2.获取jar包
    		job.setJarByClass(CacheDriver.class);
    
    		// 3.获取自定义的mapper与reducer类
    		job.setMapperClass(CacheMapper.class);
    
    		// 5.设置reduce输出的数据类型(最终的数据类型)
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 6.设置输入存在的路径与处理后的结果路径
    		FileInputFormat.setInputPaths(job, new Path("c://table1029//in"));
    		FileOutputFormat.setOutputPath(job, new Path("c://table1029//out"));
    		
    		//加载缓存商品数据
    		job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));
    		
    		//设置一下reducetask的数量
    		job.setNumReduceTasks(0);
    
    		// 7.提交任务
    		boolean rs = job.waitForCompletion(true);
    		System.out.println(rs ? 0 : 1);
    	}
    }
    

      

    二、Reducejoin案例

      1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。

      2.解决思路:封装TableBean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),

        使用Mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到TableBean对象,最后以产品id为key、TableBean对象为value传输到Reducer端;

        Reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个TableBean对象中,然后将订单表的中的数据读入到TableBean类型的ArrayList对象中,然后将ArrayList中的每个TableBean的商品id替换为商品名称,然后遍历该数组以TableBean为key输出。

      3.代码如下:

    /**
     * @author: PrincessHug
     * @date: 2019/3/30, 2:37
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class TableBean implements Writable {
        private String timeStamp;
        private String productId;
        private String orderId;
        private String productName;
        private String flag;
    
        public TableBean() {
        }
    
        public String getTimeStamp() {
            return timeStamp;
        }
    
        public void setTimeStamp(String timeStamp) {
            this.timeStamp = timeStamp;
        }
    
        public String getProductId() {
            return productId;
        }
    
        public void setProductId(String productId) {
            this.productId = productId;
        }
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public String getProductName() {
            return productName;
        }
    
        public void setProductName(String productName) {
            this.productName = productName;
        }
    
        public String getFlag() {
            return flag;
        }
    
        public void setFlag(String flag) {
            this.flag = flag;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(timeStamp);
            out.writeUTF(productId);
            out.writeUTF(orderId);
            out.writeUTF(productName);
            out.writeUTF(flag);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            timeStamp = in.readUTF();
            productId = in.readUTF();
            orderId = in.readUTF();
            productName = in.readUTF();
            flag = in.readUTF();
        }
    
        @Override
        public String toString() {
            return timeStamp + "	" + productName + "	" + orderId;
        }
    }
    
    
    public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //通过切片获取文件信息
            FileSplit split = (FileSplit) context.getInputSplit();
            String name = split.getPath().getName();
    
            //获取一行数据、定义TableBean对象
            String line = value.toString();
            TableBean tb = new TableBean();
            Text t = new Text();
    
            //判断是哪一张表
            if (name.contains("order.txt")){
                String[] fields = line.split("	");
                tb.setTimeStamp(fields[0]);
                tb.setProductId(fields[1]);
                tb.setOrderId(fields[2]);
                tb.setProductName("");
                tb.setFlag("0");
                t.set(fields[1]);
            }else {
                String[] fields = line.split("	");
                tb.setTimeStamp("");
                tb.setProductId(fields[0]);
                tb.setOrderId("");
                tb.setProductName(fields[1]);
                tb.setFlag("1");
                t.set(fields[0]);
            }
            context.write(t,tb);
        }
    }
    
    public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
            //分别创建用来存储订单表和产品表的集合
            ArrayList<TableBean> orderBean = new ArrayList<>();
            TableBean productBean = new TableBean();
    
            //遍历values,通过flag判断是产品表还是订单表
            for (TableBean v:values){
                if (v.getFlag().equals("0")){
                    TableBean tableBean = new TableBean();
                    try {
                        BeanUtils.copyProperties(tableBean,v);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                    orderBean.add(tableBean);
                }else {
                    try {
                        BeanUtils.copyProperties(productBean,v);
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
            }
            //拼接表
            for (TableBean ob:orderBean) {
                ob.setProductName(productBean.getProductName());
                context.write(ob,NullWritable.get());
            }
        }
    }
    
    public class TableDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //jar包
            job.setJarByClass(TableDriver.class);
    
            //Mapper、Reducer
            job.setMapperClass(TableMapper.class);
            job.setReducerClass(TableReducer.class);
    
            //Mapper输出数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(TableBean.class);
    
            //Reducer输出数据类型
            job.setOutputKeyClass(TableBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            //输入输出路径
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\reducejoin\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\reducejoin\out"));
    
            //提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
        }
    }
    

      

  • 相关阅读:
    分答是什么?
    判定表
    总结
    周结
    第五周周结
    周结
    一周总结(18周)
    一周总结(17周)
    一周总结(16周)
    一周总结(15周)
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10668306.html
Copyright © 2011-2022 走看看