zoukankan      html  css  js  c++  java
  • 使用MapReduce实现两个文件的Join操作

    数据结构

    customer表

           
    1
    hanmeimei  
    ShangHai   
    110
    2
    leilei 
    BeiJing    
    112
    3
    lucy   
    GuangZhou  
    119


    oder表

         
    1 1 50
    2 1 200
    3 3 15
    4 3 350
    5 3 58
    6 1 42
    7 1 352
    8 2 1135
    9 2 400
    10 2 2000
    11 2 300
    
    

    MAPJOIN

    场景:我们模拟一个有一份小表一个大表的场景,customer是那份小表,order是那份大表
    做法:直接将较小的数据加载到内存中,按照连接的关键字建立索引,

    大份数据作为MapTask的输入键值对 map()方法的每次输入都去内存当中直接去匹配连接。

    然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,

    每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引。

    环境配置:因为我们是在本地操作的,所以需要配置本地的hadoop

    1。下载hadoop

    2.解压到一个目录,记住,一会要用

    配置电脑环境变量

    如果你是一个初学者那么你就创建一个Java工程,步骤自己搜吧,网上一大堆然后创建一个mapjoin的包,在包里创建一个

    JoinDemo的类然后如下第24行代码,在JoinDemo后加extened往后就根据我的代码敲就好了。

     
    package mapjoin;
    ##以上内容标志了段代码是在mapjoin这个包里 import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.
    Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.*; import java.net.URI; import java.util.HashMap; import java.util.Map;

    ##以上内容是需要导入的jar包,否则一些代码会报红。
      1 public class JoinDemo extends Configured implements Tool {
      2     // customer文件在本地上的位置。
      3     // TODO: 改用参数传入
      4     private static final String CUSTOMER_CACHE_URL = "input/customer.txt"; --本地输入文件路径
      5 //            "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
      6     private static class CustomerBean {                 --
      7         private int custId;
      8         private String name;
      9         private String address;
     10         private String phone;
     11 
     12         public CustomerBean() {}
     13 
     14         public CustomerBean(int custId, String name, String address,String phone)
    {
    16 super(); 17 this.custId = custId; 18 this.name = name; 19 this.address = address; 20 this.phone = phone; 21 } 22 23 24 25 public int getCustId() { 26 return custId; 27 } 28 29 public String getName() { 30 return name; 31 } 32 33 public String getAddress() { 34 return address; 35 } 36 37 public String getPhone() { 38 return phone; 39 } 40 } 41 42 private static class CustOrderMapOutKey implements 43 WritableComparable<CustOrderMapOutKey> { 44 private int custId; 45 private int orderId; 46 47 public void set(int custId, int orderId) { 48 this.custId = custId; 49 this.orderId = orderId; 50 } 51 52 public int getCustId() { 53 return custId; 54 } 55 56 public int getOrderId() { 57 return orderId; 58 } 59 60 61 public void write( DataOutput out) throws IOException { 62 out.writeInt(custId); 63 out.writeInt(orderId); 64 } 65 66 67 public void readFields( DataInput in) throws IOException { 68 custId = in.readInt(); 69 orderId = in.readInt(); 70 } 71 72 public int compareTo(CustOrderMapOutKey o) { 73 int res = custId -o.custId; 74 return res == 0 ? orderId-o.orderId : res; 75 } 76 77 public boolean equals(Object obj) { 78 if (obj instanceof CustOrderMapOutKey) { 79 CustOrderMapOutKey o = (CustOrderMapOutKey)obj; 80 return custId == o.custId && orderId == o.orderId; 81 } else { 82 return false; 83 } 84 } 85 86 @Override 87 public String toString() { 88 return custId + " " + orderId; 89 } 90 } 91 92 private static class JoinMapper extends 93 Mapper<LongWritable, Text, CustOrderMapOutKey, Text> { 94 private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey(); 95 private final Text outputValue = new Text(); 96 97 98 /** 99 * 在内存中customer数据 100 */ 101 private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, CustomerBean>(); 102 103 @Override 104 protected void setup(Context context) 105 throws IOException, InterruptedException { 106 //读取缓存的文件 107 FileSystem fs = FileSystem 108 .get( URI.create(CUSTOMER_CACHE_URL), context.getConfiguration()); 109 FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL)); 110 111 BufferedReader reader = new BufferedReader(new InputStreamReader(fdis)); 112 String line = null; 113 String[] cols = null; 114 115 // 格式:客户编号 姓名 地址 电话 116 while ((line = reader.readLine()) != null) { 117 cols = line.split(" "); 118 if (cols.length < 4) { // 数据格式不匹配,忽略 119 continue; 120 } 121 122 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]); 123 //缓存客户信息 124 CUSTOMER_MAP.put(bean.getCustId(), bean); 125 } 126 } 127 128 @Override 129 protected void map(LongWritable key, Text value, Mapper.Context context) 130 throws IOException, InterruptedException { 131 132 // 格式: 订单编号 客户编号 订单金额 133 String[] cols = value.toString().split(" "); 134 if (cols.length < 3) { 135 return; 136 } 137 138 int custId = Integer.parseInt(cols[1]); // 取出客户编号 139 CustomerBean customerBean = CUSTOMER_MAP.get(custId); 140 141 if (customerBean == null) { // 没有对应的customer信息可以连接 142 return; 143 } 144 145 StringBuffer sb = new StringBuffer(); 146 sb.append(cols[2]) 147 .append(" ") 148 .append(customerBean.getName()) 149 .append(" ") 150 .append(customerBean.getAddress()) 151 .append(" ") 152 .append(customerBean.getPhone()); 153 154 outputValue.set(sb.toString()); 155 outputKey.set(custId, Integer.parseInt(cols[0])); 156 157 //WC 操作 158 // context.write(value, 1); 159 160 context.write(outputKey, outputValue); 161 } 162 163 } 164 165 /** 166 * reduce 167 * @author Ivan 168 * 169 */ 170 private static class JoinReducer extends 171 Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> { 172 @Override 173 protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context) 174 throws IOException, InterruptedException { 175 // 什么事都不用做,直接输出 176 for (Text value : values) { 177 context.write(key, value); 178 } 179 } 180 } 181 /** 182 * @param args 183 * @throws Exception 184 */ 185 public static void main(String[] args) throws Exception { 186 // if (args.length < 2) { 187 // new IllegalArgumentException("Usage: <inpath> <outpath>"); 188 // return; 189 // } 190 //调用 JoinDemo 对象的run方法 191 ToolRunner.run(new Configuration(), new JoinDemo(), args); 192 } 193 194 195 public int run(String[] args) throws Exception { 196 System.setProperty("hadoop.home.dir", "D:\software\hadoop-2.6.0\hadoop-2.6.0"); 197 //默认配置 198 Configuration conf = getConf(); 199 FileSystem fileSystem=FileSystem.get(conf); 200 Path outputPath=new Path("output/"); 201 if(fileSystem.exists(outputPath)){ 202 fileSystem.delete(outputPath,true); 203 } 204 205 Job job = Job.getInstance(conf); 206 job.setJarByClass(JoinDemo.class); 207 208 // 添加customer cache文件 209 job.addCacheFile(URI.create(CUSTOMER_CACHE_URL)); 210 Path inputPath=new Path("input/order.txt"); 211 // Path inputPath= new Path(args[0]) 212 FileInputFormat.addInputPath(job, inputPath); 213 214 // Path outputPath=new Path(args[1]); 215 FileOutputFormat.setOutputPath(job, outputPath); 216 217 // map settings 218 job.setMapperClass(JoinMapper.class); 219 job.setMapOutputKeyClass(CustOrderMapOutKey.class); 220 job.setMapOutputValueClass(Text.class); 221 222 // reduce settings 223 // job.setReducerClass(JoinReducer.class); 224 // job.setOutputKeyClass(CustOrderMapOutKey.class); 225 // job.setOutputKeyClass(Text.class); 226 boolean res = job.waitForCompletion(true); 227 return res ? 0 : 1; 228 } 229 }

     本地的文件位置

     最后执行结果:

               
    1 1 hanmeimei ShangHai 50 110
    2 1 hanmeimei ShangHai 200 110
    3 3 lucy GuangZhou 15 119
    4 3 lucy GuangZhou 350 119
    5 3 lucy GuangZhou 58 119
    6 1 hanmeimei ShangHai 42 110
    7 1 hanmeimei ShangHai 352 110
    8 2 leilei BeiJing 1135 112
    9 2 leilei BeiJing 400 112
    10 2 leilei BeiJing 2000 112
    11 2 leilei BeiJing 300 112
     
  • 相关阅读:
    ubuntu没有权限(不能)创建文件夹(目录)
    在ubuntu下安装KDE以及完全卸载KDE
    RadASM的主题更换!
    RadASM的测试工程!
    RadASM的测试工程!
    汇编工具安装三:已经配置好的汇编开发工具!
    汇编工具安装三:已经配置好的汇编开发工具!
    OSI 七层模型和 TCP/IP 四层模型 及 相关网络协议
    LwIP
    神秘的40毫秒延迟与 TCP_NODELAY
  • 原文地址:https://www.cnblogs.com/xuziyu/p/10803264.html
Copyright © 2011-2022 走看看