zoukankan      html  css  js  c++  java
  • map侧连接

    两个数据集中一个非常小,可以让小数据集存入缓存。在作业开始这些文件会被复制到运行task的节点上。 一开始,它的setup方法会检索缓存文件。

    与reduce侧连接不同,Map侧连接需要等待参与连接的数据集满足如下条件:

    1.除了连接键外,所有的输入都必须按照连接键排序。 输入的各种数据集必须有相同的分区数。 所有具有相同键的记录需要放在同一分区中。 当Map任务对其他Mapreduce作业的结果进行处理时(Cleanup时),Map侧的连接条件都自动满足 CompositeInputFormat类用于执行Map侧的连接,而输入和连接类型的配置可以通过属性指定。

    2.如果其中一个数据集足够小,旁路的分布式通道可以用在Map侧的连接中。

    实例:

    输入:   

       123(工厂)                       a(地址表):

    Beijing Red Star,1                      1,Beijing

    Shenzhen Thunder,3                       2,Guangzhou

    Guangzhou Honda,2                     3,Shenzhen

    Beijing Rising,1                        4,xian   

    Guangzhou Development Bank,2

    Tencent,3

    Back of Beijing,1

    思路:在map端中的cache载入地址表,在map阶段的setup()中,定义HashMap(),将字符串分割,放入HashMap中,然后在map阶段,利用hashmap。get(),得到对应的地址。

    代码:

    package mapreduce01;

    import java.io.IOException;

    import java.net.URI;

    import java.util.HashMap;

    import java.util.Map;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.LineReader;

    public class Mapduan {

    static String INPUT_PATH = "hdfs://master:9000/qq/123";

    static String OUTPUT_PATH="hdfs://master:9000/output";

    static class MyMapper extends Mapper<Object,Object,Text,Text>{

    Text output_key = new Text();

    Text output_value = new Text();

    Map<String,String> addMap = new HashMap<String,String>();   //image  yingshe

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException{

    URI uri=context.getCacheFiles()[0];

    Path path = new Path(uri);

    FileSystem fs = path.getFileSystem(context.getConfiguration());

    LineReader lineReader = new LineReader(fs.open(path));

    Text line=new Text();

    while(lineReader.readLine(line)>0){

    String tokens[] = line.toString().split(",");

    if(tokens!=null && tokens.length==2)

         addMap.put(tokens[0], tokens[1]);                

    }

    }

    protected void map(Object key,Object value,Context context) throws IOException,InterruptedException{

    String[] tokens = value.toString().split(",");

    if(tokens!=null&&tokens.length==2){

    output_key.set(tokens[0]);

    String addrName = addMap.get(tokens[1].toString());

    output_value.set(addrName);

    context.write(output_key,output_value);

    }

    }

    }

    static class MyReduce extends Reducer<Text,Text,Text,Text> {

    Text  output_key=new Text();

    Text  output_value=new Text();

    protected void reduce(Text key, Iterable<Text> values,Context context)  throws IOException,InterruptedException{

    context.write(key,values.iterator().next());

    }

    }

    public static void main(String[] args) throws Exception{

    Path outputpath = new Path(OUTPUT_PATH);

    Path cacheFile = new Path("hdfs://master:9000/qq/a");

    Configuration conf = new Configuration();

    FileSystem fs = outputpath.getFileSystem(conf);

    if(fs.exists(outputpath)){

    fs.delete(outputpath,true);

    }

     Job  job=Job.getInstance(conf);

     FileInputFormat.setInputPaths(job,INPUT_PATH);

     FileOutputFormat.setOutputPath(job, outputpath);

     URI uri =cacheFile.toUri();

     job.setCacheFiles(new URI[]{uri});  //set cache address

     job.setMapperClass(MyMapper.class);

     job.setReducerClass(MyReduce.class);   

     job.setOutputKeyClass(Text.class);

     job.setOutputValueClass(Text.class);

     job.waitForCompletion(true);

    }

    }

    实验结果:

    Back of Beijing Beijing

    Beijing Red Star Beijing

    Beijing Rising Beijing

    Guangzhou Development Bank Guangzhou

    Guangzhou Honda Guangzhou

    Shenzhen Thunder Shenzhen

    Tencent Shenzhen

  • 相关阅读:
    Shiro
    Python活力练习Day11
    Python活力练习Day10
    Python活力练习Day9
    数据框DataFrame和列表List相互转换
    Python活力练习Day8
    Python中绘制箭头
    Python活力练习Day7
    Python活力练习Day6
    Python活力练习Day5
  • 原文地址:https://www.cnblogs.com/luminous1/p/8386916.html
Copyright © 2011-2022 走看看