zoukankan      html  css  js  c++  java
  • YARN集群的mapreduce测试(六)

    两张表链接操作(分布式缓存):

    ----------------------------------
    假设:
    其中一张A表,只有20条数据记录(比如group表)
    另外一张非常大,上亿的记录数量(比如user表)
    ----------------------------------
    策略:
    将数据集小的文件直接装载到内存,然后迭代大文件记录;

    分布式缓存的两种角度理解(针对较小数据集):
    1、将文件不切块,直接存储到各个节点上的本地磁盘中,这种模式的缓存只是减少了网络IO,磁盘IO并没有减少;
    2、将文件不切块,直接存储到各个节点上的任务进程内存中(MapTask进程,JVM会开辟一块任务内存),这种模式的缓存从根本上取消了该文件的读取IO操作;

    测试准备

    首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

    master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

    slave上:   先有DataNode;再有NodeManager;

    如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: 

    hadoop-daemon.sh start datanode
    yarn-daemon.sh start nodemanager

    然后在集群中的“/data/cacheusergroup/smalldata/”目录下上传本地文件group表,在“/data/cacheusergroup/src”目录下长传本地文件user表;内容如下:

    测试目标:

    1、将文件不切块,直接存储到各个节点上的本地磁盘中,这种模式的缓存只是减少了网络IO,磁盘IO并没有减少;
    2、将文件不切块,直接存储到各个节点上的任务进程内存中(MapTask进程,JVM会开辟一块任务内存),这种模式的缓存从根本上取消了该文件的读取IO操作;

    测试代码大数据学习交流QQ群:217770236 让我们一起学习大数据

    1、直接存储到各个节点上的本地磁盘中:

    在mapper中的setup中要获取到数据在本地磁盘中的路径:

    (从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用)

    //因为StringBuilder比StringBuffer效率更高;StringBuilder是多线程的,当不存在内容修改时,是最好的选择;
     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.BufferedReader;
     4 import java.io.File;
     5 import java.io.FileInputStream;
     6 import java.io.IOException;
     7 import java.io.InputStreamReader;
     8 import java.net.URI;
     9 import java.util.HashMap;
    10 import java.util.Map;
    11 
    12 import org.apache.commons.io.IOUtils;
    13 import org.apache.hadoop.conf.Configuration;
    14 import org.apache.hadoop.fs.Path;
    15 import org.apache.hadoop.io.LongWritable;
    16 import org.apache.hadoop.io.Text;
    17 import org.apache.hadoop.mapreduce.MRJobConfig;
    18 import org.apache.hadoop.mapreduce.Mapper;
    19 import org.apache.hadoop.util.StringUtils;
    20 
    21 public class CacheGroupMapper extends Mapper<LongWritable, Text, Text, Text> {
    22     
    23     private Text outKey;
    24     private Text outValue;
    25     private Map<String, String> smallMap;
    26     
    27     @Override
    28     protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
    29             throws IOException, InterruptedException {
    30         outKey = new Text();
    31         outValue = new Text();
    32         smallMap = new HashMap<String, String>();
    33         
    34         Configuration conf = context.getConfiguration();
    35                 
    36         //从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用
    37         //获取缓存到本地磁盘的路径
    38 //        /*
    39         String[] localPaths = conf.getStrings(MRJobConfig.CACHE_LOCALFILES);
    40         Path[] path = StringUtils.stringToPath(localPaths);
    41         Path localPath = path[0];
    42         URI localUri = localPath.toUri();
    43         File fp = new File(localUri);
    44         BufferedReader br =null;
    45         String line = null; 
    46         try{
    47             //BufferedReader类写入文件的过程;读的是group表
    48             br = new BufferedReader(new InputStreamReader(new FileInputStream(fp)));
    49             while (null != (line=br.readLine()) ) {
    50                 String[] fields = line.split("\s+");
    51                 smallMap.put(fields[0], fields[1]);
    52             }
    53         }finally{
    54             IOUtils.closeQuietly(br);
    55         }
    56 //         */
    57     }
    58 
    59     @Override
    60     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    61             throws IOException, InterruptedException {
    62         String line = value.toString();
    63         String[] fields = line.split("\s+");
    64         
    65         String groupId = fields[2];//user表中第三列是groupId
    66         String groupInfo = smallMap.get(groupId);
    67         if (null == groupInfo) {
    68             return;
    69         }
    70         
    71         outKey.set(fields[2]);//这是user表中的第三列groupId
    72         //因为StringBuilder比StringBuffer效率更高;(因为map是一行一行的读取内容的不存在线程安全问题,所以使用StringBuilder是更好的选择)
    73         outValue.set(new StringBuilder(groupInfo).append("	").append(fields[0]).append("	").append(fields[1]).toString());
    74         context.write(outKey, outValue);
    75     }
    76     
    77     @Override
    78     protected void cleanup(Mapper<LongWritable, Text, Text, Text>.Context context)
    79             throws IOException, InterruptedException {
    80         outKey = null;
    81         outValue = null;
    82     }
    83     
    84 }
    CacheGroupMapper

    在reducer中直接将mapper中的输出结果汇总:

     1 package com.mmzs.bigdata.yarn.mapreduce;
     2 
     3 import java.io.IOException;
     4 import java.util.Iterator;
     5 
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.mapreduce.Reducer;
     8 
     9 public class CacheGroupReducer extends Reducer<Text, Text, Text, Text> {
    10     
    11     
    12     @Override
    13     protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
    14     }
    15 
    16     @Override
    17     protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
    18             throws IOException, InterruptedException {
    19         Iterator<Text> its = values.iterator();
    20         
    21         while (its.hasNext()) {
    22             context.write(key, its.next());
    23         }
    24         
    25     }
    26     
    27     @Override
    28     protected void cleanup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
    29     }
    30 
    31 
    32 }
    CacheGroupReducer

    在driver中添加如下语句:

    //将数据缓存到各个存储节点的本地磁盘中去;这是一种提升冗余度来提升性能的
    ugJob.addCacheFile(new URI("hdfs://master01:9000/data/cacheusergroup/smalldata/group"));
      1 package com.mmzs.bigdata.yarn.mapreduce;
      2 
      3 import java.io.IOException;
      4 import java.net.URI;
      5 import java.net.URISyntaxException;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FileSystem;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.LongWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     15 
     16 /**
     17  * @author hadoop
     18  *
     19  */
     20 public class CacheGroupDriver {
     21 
     22     private static FileSystem fs;
     23     private static Configuration conf;
     24     static {
     25         String uri = "hdfs://master01:9000/";
     26         conf = new Configuration();
     27         try {
     28             fs = FileSystem.get(new URI(uri), conf, "hadoop");
     29         } catch (IOException e) {
     30             e.printStackTrace();
     31         } catch (InterruptedException e) {
     32             e.printStackTrace();
     33         } catch (URISyntaxException e) {
     34             e.printStackTrace();
     35         }
     36     }
     37     
     38     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
     39         if (null==args || args.length==0) return;
     40         Job ugJob = getJob(args);
     41         if (null == ugJob) {
     42             return;
     43         }
     44         //提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
     45         boolean flag = false;
     46         flag = ugJob.waitForCompletion(true);
     47         System.exit(flag?0:1);
     48     }
     49     
     50     /**
     51      * 获取Job实例
     52      * @param args
     53      * @return
     54      * @throws IOException
     55      * @throws URISyntaxException 
     56      */
     57     public static Job getJob(String[] args) throws IOException, URISyntaxException {
     58         if (null==args || args.length<2) return null;
     59         //放置需要处理的数据所在的HDFS路径
     60         Path inputPath = new Path(args[0]);
     61         //放置Job作业执行完成之后其处理结果的输出路径
     62         Path outputPath = new Path(args[1]);
     63 
     64         if (fs.exists(outputPath)) {
     65             fs.delete(outputPath, true);//true表示递归删除
     66         }
     67         
     68         //获取Job实例
     69         Job ugJob = Job.getInstance(conf, "UserGroupJob01");
     70         //设置运行此jar包入口类
     71         //ugJob的入口是WordCountDriver类
     72         ugJob.setJarByClass(CacheGroupDriver.class);
     73         //设置Job调用的Mapper类
     74         ugJob.setMapperClass(CacheGroupMapper.class);
     75         //设置Job调用的Reducer类(如果一个Job没有Reducer则可以不调用此条语句)
     76         ugJob.setReducerClass(CacheGroupReducer.class);
     77         
     78         //设置MapTask的输出键类型
     79         ugJob.setMapOutputKeyClass(Text.class);
     80         //设置MapTask的输出值类型
     81         ugJob.setMapOutputValueClass(Text.class);
     82         
     83         //设置整个Job的输出键类型(如果一个Job没有Reducer则可以不调用此条语句)
     84         ugJob.setOutputKeyClass(Text.class);
     85         //设置整个Job的输出值类型(如果一个Job没有Reducer则可以不调用此条语句)
     86         ugJob.setOutputValueClass(Text.class);
     87         
     88         //将数据缓存到各个存储节点的本地磁盘中去;
     89         //这是一种提升冗余度来提升性能的
     90         ugJob.addCacheFile(new URI("hdfs://master01:9000/data/cacheusergroup/smalldata/group"));
     91         
     92         //设置整个Job需要处理数据的输入路径
     93         FileInputFormat.setInputPaths(ugJob, inputPath);
     94         //设置整个Job计算结果的输出路径
     95         FileOutputFormat.setOutputPath(ugJob, outputPath);
     96         
     97         return ugJob;
     98     }
     99     
    100 }
    CacheGroupDriver

    2、直接存储到各个节点上的任务进程内存中:

    下面是直接从集群路径中读取文件缓存到本地的内存中的操作方式,第一次读取就会从集群中将数据读取,需要克服一次网络IO,然后通过文件系统FileSystem来存到JVM开辟的一块任务内存smallMap中,smallMap本就是存在内存中的,当内存不足时,会发生数据丢失

    在Mapper中的setup中要获取到数据在集群中的路径:

    A、主要是修改了Mapper中的setup方法:

     1     @Override
     2     protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
     3             throws IOException, InterruptedException {
     4         outValue = new Text();
     5         smallMap = new HashMap<String, String>();
     6         outKey=new Text();
     7         Configuration conf = context.getConfiguration();
     8         
     9         //下面是直接从集群路径中读取文件缓存到本地的内存中的操作方式
    10         //第一次读取就会从集群中将数据读取,通过文件系统FileSystem来存到内存smallMap中
    11         //smallMap本就是存在内存中的,当内存不足时,会发生数据丢失;
    12         URI[] uris = context.getCacheFiles();
    13         URI uri = uris[0];
    14         Path clusterPath = new Path(uri);
    15         FileSystem fs = clusterPath.getFileSystem(conf);
    16         
    17         InputStream fis = fs.open(clusterPath);
    18         String line=null;
    19         BufferedReader br=null;
    20         try {
    21                 br=new BufferedReader(new InputStreamReader(fis));
    22                 while(null!=(line=br.readLine())){
    23                     String[] fields=line.split("\s+");
    24                     smallMap.put(fields[0], fields[1]);
    25                 }
    26         }finally{
    27             if(br!=null) br.close();
    28         }
    29
    Mapper类的setup方法

    Reduce不做修改;Driver类也不做修改;能得到同样的结果;

    B、如果在Driver中删除如下语句:

    ugJob.addCacheFile(new URI("hdfs://master01:9000/data/cacheusergroup/smalldata/group"));

    那么Mapper中的setup方法修改为:

     1     @Override
     2     protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
     3             throws IOException, InterruptedException {
     4         outValue = new Text();
     5         smallMap = new HashMap<String, String>();
     6         outKey=new Text();
     7         Configuration conf = context.getConfiguration();
     8         
     9         //下面是直接从集群路径中读取文件缓存到本地的内存中的操作方式
    10         //第一次读取就会从集群中将数据读取,通过文件系统FileSystem来存到内存smallMap中
    11         //smallMap本就是存在内存中的,当内存不足时,会发生数据丢失;
    12         FileSystem fs = null;
    13         Path clusterPath = null;
    14         try {
    15             fs = FileSystem.get(new URI("hdfs://master01:9000/"), conf, "hadoop");
    16             clusterPath = new Path("hdfs://master01:9000/data/cacheusergroup/smalldata/group");
    17         } catch (URISyntaxException e) {
    18             e.printStackTrace();
    19         }
    20         
    21         InputStream fis = fs.open(clusterPath);
    22         String line=null;
    23         BufferedReader br=null;
    24         try {
    25                 br=new BufferedReader(new InputStreamReader(fis));
    26                 while(null!=(line=br.readLine())){
    27                     String[] fields=line.split("\s+");
    28                     smallMap.put(fields[0], fields[1]);
    29                 }
    30         }finally{
    31             if(br!=null) br.close();
    32         }
    33         
    34     }
    Mapper类的setup方法
      1 package com.mmzs.bigdata.yarn.mapreduce;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.File;
      5 import java.io.FileInputStream;
      6 import java.io.IOException;
      7 import java.io.InputStream;
      8 import java.io.InputStreamReader;
      9 import java.net.URI;
     10 import java.net.URISyntaxException;
     11 import java.util.HashMap;
     12 import java.util.Map;
     13 
     14 import org.apache.commons.io.IOUtils;
     15 import org.apache.hadoop.conf.Configuration;
     16 import org.apache.hadoop.fs.FileSystem;
     17 import org.apache.hadoop.fs.Path;
     18 import org.apache.hadoop.io.LongWritable;
     19 import org.apache.hadoop.io.Text;
     20 import org.apache.hadoop.mapreduce.MRJobConfig;
     21 import org.apache.hadoop.mapreduce.Mapper;
     22 import org.apache.hadoop.util.StringUtils;
     23 
     24 public class CacheGroupMapper extends Mapper<LongWritable, Text, Text, Text> {
     25     
     26     private Text outKey;
     27     private Text outValue;
     28     private Map<String, String> smallMap;
     29     
     30     @Override
     31     protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
     32             throws IOException, InterruptedException {
     33         outValue = new Text();
     34         smallMap = new HashMap<String, String>();
     35         outKey=new Text();
     36         Configuration conf = context.getConfiguration();
     37         
     38         //下面是直接从集群路径中读取文件缓存到本地的内存中的操作方式
     39         //第一次读取就会从集群中将数据读取,通过文件系统FileSystem来存到内存smallMap中
     40         //smallMap本就是存在内存中的,当内存不足时,会发生数据丢失;
     41         FileSystem fs = null;
     42         Path clusterPath = null;
     43         try {
     44             fs = FileSystem.get(new URI("hdfs://master01:9000/"), conf, "hadoop");
     45             clusterPath = new Path("hdfs://master01:9000/data/cacheusergroup/smalldata/group");
     46         } catch (URISyntaxException e) {
     47             e.printStackTrace();
     48         }
     49         
     50 //        URI[] uris = context.getCacheFiles();
     51 //        URI uri = uris[0];
     52 //        Path clusterPath = new Path(uri);
     53 //        FileSystem fs = clusterPath.getFileSystem(conf);
     54         
     55         InputStream fis = fs.open(clusterPath);
     56         String line=null;
     57         BufferedReader br=null;
     58         try {
     59                 br=new BufferedReader(new InputStreamReader(fis));
     60                 while(null!=(line=br.readLine())){
     61                     String[] fields=line.split("\s+");
     62                     smallMap.put(fields[0], fields[1]);
     63                 }
     64         }finally{
     65             if(br!=null) br.close();
     66         }
     67         
     68         //从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用
     69         //获取缓存到本地磁盘的路径
     70         /*
     71         String[] localPaths = conf.getStrings(MRJobConfig.CACHE_LOCALFILES);
     72         Path[] path = StringUtils.stringToPath(localPaths);
     73         Path localPath = path[0];
     74         URI localUri = localPath.toUri();
     75         File fp = new File(localUri);
     76         BufferedReader br =null;
     77         String line = null; 
     78         try{
     79             //BufferedReader类写入文件的过程;读的是group表
     80             br = new BufferedReader(new InputStreamReader(new FileInputStream(fp)));
     81             while (null != (line=br.readLine()) ) {
     82                 String[] fields = line.split("\s+");
     83                 smallMap.put(fields[0], fields[1]);
     84             }
     85         }finally{
     86             IOUtils.closeQuietly(br);
     87         }
     88         */
     89     }
     90 
     91     @Override
     92     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
     93             throws IOException, InterruptedException {
     94         //如果没有setup中的语句,那么每次map都会克服磁盘IO和网络IO        
     95         String line = value.toString();
     96         String[] fields = line.split("\s+");
     97         
     98         String groupId = fields[2];//user表中第三列是groupId
     99         String groupInfo = smallMap.get(groupId);
    100         if (null == groupInfo) {
    101             return;
    102         }
    103         
    104         outKey.set(fields[2]);//这是user表中的第三列groupId
    105         //因为StringBuilder比StringBuffer效率更高;(因为map是一行一行的读取内容的不存在线程安全问题,所以使用StringBuilder是更好的选择)
    106         outValue.set(new StringBuilder(groupInfo).append("	").append(fields[0]).append("	").append(fields[1]).toString());
    107         context.write(outKey, outValue);
    108     }
    109     
    110     @Override
    111     protected void cleanup(Mapper<LongWritable, Text, Text, Text>.Context context)
    112             throws IOException, InterruptedException {
    113         outKey = null;
    114         outValue = null;
    115     }
    116     
    117 }
    CacheGroupMapper

    Reduce不做修改;也能得到同样的结果;

     如果没有setup中的语句,那么每次map都会克服磁盘IO和网络IO;性能消耗很大,不利于效率的提升;所以出现了缓存。

    测试结果:

    运行时传入参数是:

    如果在客户端eclipse上运行:传参需要加上集群的master的uri即 hdfs://master01:9000

    输入路径参数:  /data/cacheusergroup/src

    输出路径参数:  /data/cacheusergroup/dst

     测试过程中,忘记初始化outValue,导致输出没结果,希望道友不要犯类似的小错误。

  • 相关阅读:
    Cocoa中对日期和时间的处理 NSCalendar (一)
    iOS优秀博客收录
    八大排序(内容是转载链接)
    IOS8定位无效问题
    ios 里如何判断当前应用的定位服务是否可用
    MKMapView和MKMapViewDelegate
    根据经纬度翻译成详细位置的各种方法
    iOS 根据经纬度反查 地名
    MapKit学习笔记
    解决Collection <__NSArrayM: 0xb550c30> was mutated while being enumerated
  • 原文地址:https://www.cnblogs.com/mmzs/p/8064930.html
Copyright © 2011-2022 走看看