zoukankan      html  css  js  c++  java
  • 9.3.1 map端连接- DistributedCache分布式缓存小数据集

    1.1.1         map端连接- DistributedCache分布式缓存小数据集

    当一个数据集非常小时,可以将小数据集发送到每个节点,节点缓存到内存中,这个数据集称为边数据。用map函数将小数据集中的数据按键聚合到大的数据集中,输出连接数据集,进行连接操作。

    (1)   分布式缓存指定缓存文件

    执行命令行时,采用hadoop  jar hadoop-example.jar MapSideJoinMain  -files input/cityfile/tb_dim_city.dat input/data/all output

    -files input/cityfile/tb_dim_city.dat指定需要缓存的文件,会被复制到各个节任务点。

    (2)指定缓存文件的三种类型

     Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中,任务运行前,节点管理器从分布式文件系统中复制文件到本地。

     1) -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。

     2) -archives 选项向自己的任务中复制存档(压缩)文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。

     3) -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。

    (3)缓存文件删除机制

    节点管理器为缓存中的文件各维护一个计数器,任务运行时,文件计数器加1,任务完成后,计数器减1,计数器为0时才能删除文件,当节点缓存容量大于一定值(yarn.nodemanger.localizer.cache.target-size-mb设置,默认10GB),才会删除最近最少使用的文件。

    (4)Job的分布式缓存API

    除了可以用命令行参数指定缓存文件外,还以通过Job的API指定缓存文件;即通过job对象调用下面的函数设置缓存文件。

    //以下两组方法将文件或存档添加到分布式缓存

    public void addCacheFile(URI uri);

    public void addCacheArchive(URI uri);

    //以下两组方法将一次性向分布式缓存中添加一组文件或存档

    public void setCacheFiles(URI[] files);

    public void setCacheArchives(URI[] archives);

    //以下两组方法将文件或存档添加到 MapReduce 任务的类路径

    public void addFileToClassPath(Path file);

    public void addArchiveToClassPath(Path archive);

    public void createSymlink();

    (6)DistributedCache缓存小数据集实现hadoop map端连接实例

    下面的实例是将城市名称的数据集和用户信息的数据集进行连接,城市名称的数据集很小,而用户信息的数据集很大,所以可以采用缓存文件的方式,将城市信息数据集发送到任务,map任务通过setup方法从缓存中读取小数据集文件tb_dim_city.dat,在内存中形成map映射,map函数处理用户信息数据,根据用户信息中的城市id去map映射中找到城市名称,然后合并输出。

    package Temperature;
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.HashMap;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     *
     *
    用途说明:  
     * Map side join中的left outer join  
     * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  
     * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),  
     * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":  
     * id     name  orderid  city_code  is_show  
     * 0       其他        9999     9999         0  
     * 1       长春        1        901          1  
     * 2       吉林        2        902          1  
     * 3       四平        3        903          1  
     * 4       松原        4        904          1  
     * 5       通化        5        905          1  
     * 6       辽源        6        906          1  
     * 7       白城        7        907          1  
     * 8       白山        8        908          1  
     * 9       延吉        9        909          1  
     * -------------------------风骚的分割线-------------------------------  
     * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  
     * tb_user_profiles.dat文件内容,分隔符为"|":  
     * userID   network     flow    cityID  
     * 1           2G       123      1  
     * 2           3G       333      2  
     * 3           3G       555      1  
     * 4           2G       777      3  
     * 5           3G       666      4  
     * -------------------------风骚的分割线-------------------------------  
     *  结果:  
     *  1   长春  1   901 1   1   2G  123  
     *  1   长春  1   901 1   3   3G  555  
     *  2   吉林  2   902 1   2   3G  333  
     *  3   四平  3   903 1   4   2G  777  
     *  4   松原  4   904 1   5   3G  666  
     */
    public class MapSideJoinMain extends Configured implements Tool{
        private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
        public static class LeftOutJoinMapper extends Mapper {

            private HashMap city_info = new HashMap<String,String>();
            private Text outPutKey = new Text();
            private Text outPutValue = new Text();
            private String mapInputStr = null;
            private String mapInputSpit[] = null;
            private String city_secondPart = null;
            /**
             *
    此方法在每个task开始之前执行,这里主要用作从DistributedCache  
             * 中取到tb_dim_city文件,并将里边记录取出放到内存中。  
             */
           
    @Override
            protected void setup(Context context)
                    throws IOException, InterruptedException {
                BufferedReader br = null;
                //获得当前作业的DistributedCache相关文件  
                Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String cityInfo = null;
                for(Path p : distributePaths){
                    if(p.toString().endsWith("tb_dim_city.dat")){
                        //读缓存文件,并放到mem中  
                        br = new BufferedReader(new FileReader(p.toString()));
                        while(null!=(cityInfo=br.readLine())){
                            String[] cityPart = cityInfo.split("\|",5);
                            if(cityPart.length ==5){
                                city_info.put(cityPart[0], cityPart[1]+" "+cityPart[2]+" "+cityPart[3]+" "+cityPart[4]);
                            }
                        }
                    }
                }
            }

            /**
             * Map
    端的实现相当简单,直接判断tb_user_profiles.dat中的  
             * cityID是否存在我的map中就ok了,这样就可以实现Map Join了  
             */
           
    protected void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                //排掉空行  
                if(value == null || value.toString().equals("")){
                    return;
                }
                mapInputStr = value.toString();
                mapInputSpit = mapInputStr.split("\|",4);
                //过滤非法记录  
                if(mapInputSpit.length != 4){
                    return;
                }
                //判断链接字段是否在map中存在  
                city_secondPart = (String) city_info.get((Object) mapInputSpit[3]);
                if(city_secondPart != null){
                    this.outPutKey.set(mapInputSpit[3]);
                    this.outPutValue.set(city_secondPart+" "+mapInputSpit[0]+" "+mapInputSpit[1]+" "+mapInputSpit[2]);
                    context.write(outPutKey, outPutValue);
                }
            }
        }
       
        public int run(String[] args) throws Exception {
            Configuration conf=getConf(); //获得配置文件对象  
            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件  
            Job job=new Job(conf,"MapJoinMR");
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径  
            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

            job.setJarByClass(MapSideJoinMain.class);
            job.setMapperClass(LeftOutJoinMapper.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式  
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型  
            job.setMapOutputKeyClass(Text.class);

            //设置reduce的输出key和value类型  
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.waitForCompletion(true);
            return job.isSuccessful()?0:1;
        }
        public static void main(String[] args) throws IOException,
                ClassNotFoundException, InterruptedException {
            try {
                int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);
                System.exit(returnCode);
            } catch (Exception e) {
                // TODO Auto-generated catch block  
               
    logger.error(e.getMessage());
            }
        }
    }

    实例参考文献:

    https://www.cnblogs.com/cssdongl/p/6018806.html

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

  • 相关阅读:
    HDU2013 蟠桃记
    HDU2012 素数判定
    I00030 Grades conversion
    HDU2011 多项式求和
    HDU2009 求数列的和
    HDU2005 第几天?【日期计算】
    HDU2004 成绩转换
    HDU2006 求奇数的乘积
    HDU2007 平方和与立方和【序列处理】
    HDU2010 水仙花数【进制+趣味程序】
  • 原文地址:https://www.cnblogs.com/bclshuai/p/12319471.html
Copyright © 2011-2022 走看看