zoukankan      html  css  js  c++  java
  • 大数据学习之编程案例Join操作14

    一:mapjoin(Map端的Join操作)

    任务?将商品表和订单表进行关联起来(相当于MySQL中的等值连接)

    提示:这里其实是对2个表进行操作。我们这里其实运用到了分布式缓存,将商品表进行了缓存

    数据预览:

     

    Step1:在Order表中对应的01 Id和 商品的01 id和商品名表关联起来,

    Step2:然后将id对应的名字输出添加到订单表的每一行的最后

    输出样式: 201801 01 1  苹果

    代码编写

    Map阶段

    package it.dawn.MR案列.MapJoin;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.HashMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.commons.lang.StringUtils;
    
    /**
     * @author Dawn
     * @date 2019年5月12日22:25:38
     * @version 1.0
     * 2个表进行操作,有点像MySql中的多表查询的等值连接
     * 
     */
    
    //思路:商品表加载到内存中  然后数据在map端输出前 进行替换
    public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    	
    	public static FileSystem fs=null;
    	public static String hdfs="hdfs://192.168.40.111:9000";
    	
    	HashMap<String, String> pdMap=new HashMap<>();
    	//1.商品表加载到内存
    		protected void setup(Context context) throws IOException, InterruptedException {
    			
    //			//加载缓存文件
    //			BufferedReader br = new BufferedReader(new FileReader("pd"));
    			
    			//读取失败。直接读取文件系统里的文件算了
    			Configuration conf=new Configuration();
    			try {
    				fs=FileSystem.get(new URI(hdfs), conf, "root");
    			} catch (URISyntaxException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			
    			//1 拿到流
    			FSDataInputStream in=fs.open(new Path("/input/MRclassic/pd.txt"));
    			
    			//2.缓冲流
    			BufferedReader br=new BufferedReader(new InputStreamReader(in, "UTF-8"));
    			
    			String line;
    			
    			while(StringUtils.isNotEmpty(line = br.readLine()) ) {
    				
    				//切分
    				String[] fields = line.split("	");
    				
    				//缓存
    				pdMap.put(fields[0], fields[1]);
    				
    			}
    			
    			br.close();
    			fs.close();
    		
    		}
    	
    	//2.map传输
    	@Override
    	protected void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    		//获取数据
    		String line=value.toString();
    		
    		//切割
    		String[] fields = line.split("	");
    		
    		//获取订单中商品id
    		String pid=fields[1];
    		
    		//根据订单商品id获取商品名
    		String pName=pdMap.get(pid);
    		
    		//拼接数据
    		line = line+pName;
    		
    		//输出
    		context.write(new Text(line), NullWritable.get());
    	}
    
    	
    	
    	
    
    }
    

      

    编写驱动类(没有Reduce阶段):

    package it.dawn.MR案列.MapJoin;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * @author Dawn
     * @date 2019年5月12日22:44:27
     * @version 1.0
     * 
     * 这里没有写Reduce,可以不用写,也可以写一个走个过场
     * 不写的话要设置一下reducetask的数量,默认是 1
     * 
     * 
     */
    public class CacheDriver {
    	
    	private static String inPath="/input/MRclassic/order.txt";
    	private static String outPath="/input/MRclassic/out1";
    	private static String hdfs="hdfs://bigdata111:9000";
    	
    	public static void main(String[] args) throws IllegalArgumentException, IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
    
    		// 1.获取job信息
    		Configuration conf = new Configuration();
    		conf.set("fs.defaultFS", hdfs);
    		Job job = Job.getInstance(conf);
    
    		//添加分布式缓存
    //		job.addCacheFile(new URI((cache)+"#pd"));
    		// 2.获取jar包
    		job.setJarByClass(CacheDriver.class);
    
    		// 3.获取自定义的mapper与reducer类
    		job.setMapperClass(CacheMapper.class);
    
    		// 5.设置reduce输出的数据类型(最终的数据类型)
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 6.设置输入存在的路径与处理后的结果路径
    		FileInputFormat.setInputPaths(job, new Path(inPath));
    		FileOutputFormat.setOutputPath(job, new Path(outPath));
    		
    		
    		
    		
    		//设置一下reducetask的数量
    		job.setNumReduceTasks(0);
    
    		// 7.提交任务
    		boolean rs = job.waitForCompletion(true);
    		System.out.println(rs ? 0 : 1);
    	}
    	
    
    }
    

      

    运行结果:

    Bug处理:

    这里我用分布式缓存失败了。直接在SetUp方法中读取的文件,暂时解决我们的需求。并未在Driver类中添加job.addCacheFile(new URI((cache)+"#pd"));这一行代码

    后面再慢慢解决这个吧!!

    二 :Reduce端的join操作

    需求?

    根据商品表和订单表,把订单表中商品ID 替换成 商品表中对应的商品名

    输出样式如下

     201801 苹果 1  

    代码编写:

    序列化,封装表

    package it.dawn.MR案列.ReduceJoin;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class TableBean implements Writable{
    	
    	//封装对应字段
    	private String order_id;//订单id
    	private String pid;//产品id
    	private int amount;//产品数量
    	private String pname;//产品名称
    	private String flag;//判断是订单表还是商品表
    	
    	public TableBean() {
    		super();
    	}
    	
    	public String getOrder_id() {
    		return order_id;
    	}
    	public void setOrder_id(String order_id) {
    		this.order_id = order_id;
    	}
    	public String getPid() {
    		return pid;
    	}
    	public void setPid(String pid) {
    		this.pid = pid;
    	}
    	public int getAmount() {
    		return amount;
    	}
    	public void setAmount(int amount) {
    		this.amount = amount;
    	}
    	public String getPname() {
    		return pname;
    	}
    	public void setPname(String pname) {
    		this.pname = pname;
    	}
    	public String getFlag() {
    		return flag;
    	}
    	public void setFlag(String flag) {
    		this.flag = flag;
    	}
    	
    	//序列化
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(order_id);
    		out.writeUTF(pid);
    		out.writeInt(amount);
    		out.writeUTF(pname);
    		out.writeUTF(flag);
    		
    	}
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		order_id=in.readUTF();
    		pid=in.readUTF();
    		amount=in.readInt();
    		pname=in.readUTF();
    		flag=in.readUTF();
    	}
    
    	@Override
    	public String toString() {
    		return order_id+"	"+pname+"	"+amount;
    	}
    
    	
    }
    

      

    Map阶段

    package it.dawn.MR案列.ReduceJoin;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
    
    	@Override
    	protected void map(LongWritable key, Text value,Context context)
    			throws IOException, InterruptedException {
    		TableBean v = new TableBean();
    		Text k = new Text();
    		
    		//区分两张表
    		FileSplit inputSplit = (FileSplit)context.getInputSplit();//拿到切片信息
    		String name=inputSplit.getPath().getName();
    		
    		//获取数据
    		String line=value.toString();
    		
    		//区分 此时是订单表
    		if(name.contains("order.txt")) {
    			//切分字段
    			String[] fields=line.split("	");
    			
    			//封装对象
    			v.setOrder_id(fields[0]);
    			v.setPid(fields[1]);
    			v.setAmount(Integer.parseInt(fields[2]));
    			v.setPname("");
    			v.setFlag("0");
    			
    			//设置k 商品id作为k
    			k.set(fields[1]);
    		}else {
    			//此时是商品表
    			//切分字段
    			String[] fields = line.split("	");
    			
    			//封装对象
    			v.setOrder_id("");
    			v.setPid(fields[0]);
    			v.setAmount(0);
    			v.setPname(fields[1]);
    			v.setFlag("1");
    			
    			//设置k 商品id
    			k.set(fields[0]);
    		}
    		
    		
    		context.write(k, v);
    		
    	}
    	
    	
    	
    
    }
    

      

    Reduce阶段:

    package it.dawn.MR案列.ReduceJoin;
    
    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.util.ArrayList;
    
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * @author Dawn
     * @date 2019年5月13日15:29:36
     * @version 1.0
     * 思路?
     * 注意一点。我们Map阶段出来的相同的key。这里key相同说明商品表中的pid=订单表中的pid
     * 
     * 1:将读取的一个个订单对象放到一个集合中
     * 
     * 2:把pd中商品名 拷贝到一个orderBean中(这里就只需要一个orderBean对象就行了。不用想像传来的订单对象那样放在一个集合中)
     * 
     *:3:最后tableBean.setPname(pdBean.getPname());
     */
    public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable>{
    
    	@Override
    	protected void reduce(Text key, Iterable<TableBean> values,
    			Context context) throws IOException, InterruptedException {
    		//创建一个集合,用来存放数据
    		ArrayList<TableBean> orderList = new ArrayList<>();
    		
    		//商品存储
    		TableBean pdBean = new TableBean();//把pd中商品名 拷贝到orderBean
    		
    		for(TableBean v: values) {
    			if("0".equals(v.getFlag())) {//订单表
    				//1.创建一个临时变量 拷贝数据
    				TableBean tableBean = new TableBean();
    				
    				//2:拷贝
    				try {
    					BeanUtils.copyProperties(tableBean, v);
    				} catch (IllegalAccessException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				} catch (InvocationTargetException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    				
    				//添加到集合中
    				orderList.add(tableBean);
    			}else {
    				//直接拷贝到pdBean对象中,应为key相同,所以肯定只有一个
    				try {
    					BeanUtils.copyProperties(pdBean, v);
    				} catch (IllegalAccessException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				} catch (InvocationTargetException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    			
    		}
    		//拼接表
    		for(TableBean order:orderList) {
    			//加入商品名
    			order.setPname(pdBean.getPname());
    			context.write(order, NullWritable.get());
    		}
    	}
    	
    	
    
    }
    

      

    编写Driver类:

    package it.dawn.MR案列.ReduceJoin;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * @author Dawn
     * @date 2019年5月13日15:03:57
     * @version 1.0
     * 需求?
     * 根据商品表和订单表,把订单表中商品ID 替换成 商品表中对应的商品名
     * 输出样式如下:
     * 201801	苹果	1  
     */
    public class TableDriver {
    	
    	//输入
    	private static String inPath="/input/MRclassic/*.txt";
    	//输出
    	private static String outPath="/input/MRclassic/out2";
    	//hdfs地址
    	private static String hdfs="hdfs://bigdata111:9000";
    	
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		// 1.获取job信息
    		Configuration conf = new Configuration();
    		conf.set("fs.defaultFS", hdfs);
    		Job job = Job.getInstance(conf);
    
    		// 2.获取jar包
    		job.setJarByClass(TableDriver.class);
    
    		// 3.获取自定义的mapper与reducer类
    		job.setMapperClass(TableMapper.class);
    		job.setReducerClass(TableReducer.class);
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(TableBean.class);
    
    		// 5.设置reduce输出的数据类型(最终的数据类型)
    		job.setOutputKeyClass(TableBean.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 6.设置输入存在的路径与处理后的结果路径
    		FileInputFormat.setInputPaths(job, new Path(inPath));
    		FileOutputFormat.setOutputPath(job, new Path(outPath));
    
    		// 7.提交任务
    		boolean rs = job.waitForCompletion(true);
    		System.out.println(rs ? 0 : 1);
    	}
    
    }
    

      

    运行结果如下:

  • 相关阅读:
    20180831 租房子(管理员) 增加 删除 修改 发布
    租房子(用户登录) 最主要的 查!!!!!!(只显示查的页面代码)
    20180825 ajax PHP html js 实现 三级联动(省 市 区)
    20180823 连接数据库 把数据库的内容呈现到js上面 重点牢记
    20180821 PHP环境下 数组 函数
    20180820 PHP 基本环境 变量 常量 数据类型 运算符
    移动端h5列表页上拉分页加载更多
    JS中的if和else的用法以及基础语法
    JS数组方法大全
    7.7PHP所学知识总结
  • 原文地址:https://www.cnblogs.com/hidamowang/p/10860361.html
Copyright © 2011-2022 走看看