zoukankan      html  css  js  c++  java
  • map侧连接

    两个数据集中一个非常小,可以让小数据集存入缓存。在作业开始这些文件会被复制到运行task的节点上。 一开始,它的setup方法会检索缓存文件。

    与reduce侧连接不同,Map侧连接需要等待参与连接的数据集满足如下条件:

    1.除了连接键外,所有的输入都必须按照连接键排序。 输入的各种数据集必须有相同的分区数。 所有具有相同键的记录需要放在同一分区中。 当Map任务对其他Mapreduce作业的结果进行处理时(Cleanup时),Map侧的连接条件都自动满足 CompositeInputFormat类用于执行Map侧的连接,而输入和连接类型的配置可以通过属性指定。

    2.如果其中一个数据集足够小,旁路的分布式通道可以用在Map侧的连接中。

    实例:

    输入:   

       123(工厂)                       a(地址表):

    Beijing Red Star,1                      1,Beijing

    Shenzhen Thunder,3                       2,Guangzhou

    Guangzhou Honda,2                     3,Shenzhen

    Beijing Rising,1                        4,xian   

    Guangzhou Development Bank,2

    Tencent,3

    Back of Beijing,1

    思路:在map端中的cache载入地址表,在map阶段的setup()中,定义HashMap(),将字符串分割,放入HashMap中,然后在map阶段,利用hashmap。get(),得到对应的地址。

    代码:

    package mapreduce01;

    import java.io.IOException;

    import java.net.URI;

    import java.util.HashMap;

    import java.util.Map;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.LineReader;

    public class Mapduan {

    static String INPUT_PATH = "hdfs://master:9000/qq/123";

    static String OUTPUT_PATH="hdfs://master:9000/output";

    static class MyMapper extends Mapper<Object,Object,Text,Text>{

    Text output_key = new Text();

    Text output_value = new Text();

    Map<String,String> addMap = new HashMap<String,String>();   //image  yingshe

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException{

    URI uri=context.getCacheFiles()[0];

    Path path = new Path(uri);

    FileSystem fs = path.getFileSystem(context.getConfiguration());

    LineReader lineReader = new LineReader(fs.open(path));

    Text line=new Text();

    while(lineReader.readLine(line)>0){

    String tokens[] = line.toString().split(",");

    if(tokens!=null && tokens.length==2)

         addMap.put(tokens[0], tokens[1]);                

    }

    }

    protected void map(Object key,Object value,Context context) throws IOException,InterruptedException{

    String[] tokens = value.toString().split(",");

    if(tokens!=null&&tokens.length==2){

    output_key.set(tokens[0]);

    String addrName = addMap.get(tokens[1].toString());

    output_value.set(addrName);

    context.write(output_key,output_value);

    }

    }

    }

    static class MyReduce extends Reducer<Text,Text,Text,Text> {

    Text  output_key=new Text();

    Text  output_value=new Text();

    protected void reduce(Text key, Iterable<Text> values,Context context)  throws IOException,InterruptedException{

    context.write(key,values.iterator().next());

    }

    }

    public static void main(String[] args) throws Exception{

    Path outputpath = new Path(OUTPUT_PATH);

    Path cacheFile = new Path("hdfs://master:9000/qq/a");

    Configuration conf = new Configuration();

    FileSystem fs = outputpath.getFileSystem(conf);

    if(fs.exists(outputpath)){

    fs.delete(outputpath,true);

    }

     Job  job=Job.getInstance(conf);

     FileInputFormat.setInputPaths(job,INPUT_PATH);

     FileOutputFormat.setOutputPath(job, outputpath);

     URI uri =cacheFile.toUri();

     job.setCacheFiles(new URI[]{uri});  //set cache address

     job.setMapperClass(MyMapper.class);

     job.setReducerClass(MyReduce.class);   

     job.setOutputKeyClass(Text.class);

     job.setOutputValueClass(Text.class);

     job.waitForCompletion(true);

    }

    }

    实验结果:

    Back of Beijing Beijing

    Beijing Red Star Beijing

    Beijing Rising Beijing

    Guangzhou Development Bank Guangzhou

    Guangzhou Honda Guangzhou

    Shenzhen Thunder Shenzhen

    Tencent Shenzhen

  • 相关阅读:
    课堂作业04 2017.10.27
    课程作业 03 动手动脑 2017.10.20
    课程作业 03 2017.10.20
    HDU 3974 Assign the task
    POJ 2155 Matrix
    POJ 2481 Cows
    HDU 3038 How Many Answers Are Wrong
    CS Academy Array Removal
    POJ_1330 Nearest Common Ancestors LCA
    CF Round 427 D. Palindromic characteristics
  • 原文地址:https://www.cnblogs.com/luminous1/p/8386916.html
Copyright © 2011-2022 走看看