zoukankan      html  css  js  c++  java
  • MapReduce实例&YARN框架

    MapReduce实例&YARN框架


    一个wordcount程序

    统计一个相当大的数据文件中,每个单词出现的个数。

    一、分析map和reduce的工作

    map:

    1. 切分单词
    2. 遍历单词数据输出

    reduce:

    对从map中得到的数据的valuelist遍历累加,得到一个单词的总次数

    二、代码

    WordCountMapper(继承Mapper)

    重写Mapper类的map方法。

    mapreduce框架每读一行数据就调用一次该方法,map的具体业务逻辑就写在这个方法体中。

    1. map和reduce的数据输入输出都是以key-value对的形式封装的
    2. 4个泛型中,前两个(KEYIN, VALUEIN)指定mapper输入数据的类型, 后两个(KEYOUT, VALUEOUT)指定输出数据的类型
    3. 默认情况下,框架传递给mapper的输入数据中,key是要处理的文本中一行的起始偏移量,value是这行的内容
    4. 由于输入输出在结点中通过网络传递,数据需要序列化,但JDK自带的序列化机制会有附加信息冗余,对于大量数据传输不合适,因此 <Long, String, String, Long> -> <LongWritable, Text, Text, LongWritable>
    5. 业务中要处理的数据已经作为参数key-value被传递进来了,处理后的输出是调用context.write()写入到context
    package cn.thousfeet.hadoop.mapreduce.wordcount;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    	
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    			throws IOException, InterruptedException {
    		
    		String line = value.toString();
    		
    		String[] words = StringUtils.split(line," "); //切分单词
    		
    		for(String word : words) //遍历 输出为key-value( <word,1> )
    		{
    			context.write(new Text(word), new LongWritable(1));
    		}
    	
    	}
    	
    }
    
    

    WordCountReducer(继承Reducer)

    重写Reducer类的reduce方法。

    框架在map处理完成后,将所有的key-value对缓存起来进行分组,然后传递到一个组 <key,values{}> (对于wordcount程序,拿到的就是类似 <hello,{1,1,1,1...}>),然后调用一次reduce方法。

    package cn.thousfeet.hadoop.mapreduce.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    
    	@Override
    	protected void reduce(Text key, Iterable<LongWritable> valueList,
    			Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    		
    		long count = 0;
    		
    		for(LongWritable value : valueList) //遍历value list累加求和
    		{
    			count += value.get();
    		}
    		
    		context.write(key, new LongWritable(count)); //输出这一个单词的统计结果
    	}
    }
    
    

    WordCountRunner

    用于描述job。

    比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce。还可以指定该作业要处理的数据所在的路径,和输出的结果放到哪个路径。

    package cn.thousfeet.hadoop.mapreduce.wordcount;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    public class WordCountRunner {
    
    		public static void main(String[] args) throws Exception {
    			
    			Configuration conf = new Configuration();
    			Job job = Job.getInstance(conf);
    			
    			//设置整个job所用的那些类在哪个jar包
    			job.setJarByClass(WordCountRunner.class);
    			
    			//指定job使用的mapper和reducer类
    			job.setMapperClass(WordCountMapper.class);
    			job.setReducerClass(WordCountReducer.class);
    			
    			//指定reduce和mapper的输出数据key-value类型
    			job.setOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(LongWritable.class);
    			
    			//指定mapper的输出数据key-value类型
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(LongWritable.class);
    			
    			//指定原始输入数据的存放路径
    			FileInputFormat.setInputPaths(job, new Path("/wordcount/srcdata/"));
    			
    			//指定处理结果数据的存放路径
    			FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));
    		
    			//将job提交给集群运行 参数为true时会打印运行进度
    			job.waitForCompletion(true);
    		}
    }
    
    

    上传到集群中运行

    export成一个jar包,上传到虚拟机上。

    分发到集群运行:hadoop jar wordcount.jar cn.thousfeet.hadoop.mapreduce.wordcount.WordCountRunner

    查看输出结果:

    (可以看到按key的字典序升序排序)


    MapReduce程序几种不同的提交运行模式

    方式一:本机的JVM运行

    首先,因为要在windows下直接调试,需要在eclipse的设置 Run Configurations->arguments->vm arguments ,添加-DHADOOP_USER_NAME=对应用户

    如需在本地直接run main方法(MapReduce程序在本机的JVM运行),要把输入输出路径改为hdfs全路径或把site.xml配置文件拖进来(或用在windows本地目录下的数据也行,MapReduce程序的运行和数据来源在哪无关)。

    方式二:本地debug实际运行在集群

    如需实现在本地run main方法而MapReduce实际运行在集群(这种方式必须在linux下),应:

    1. 将mapred-site.xml和yarn-site.xml拖到工程的src目录下(或给conf配置mapreduce.framework.nameyarn.resourcemanager.hostname等参数)
    2. 给工程导出一个jar包(比如放在工程目录下),配置该job的jar包的路径conf.set("mapreduce.job.jar","wordcount.jar");

    (在windows下要用这种方法需要修改hadoop的YarnRunner这个类的源码,或者安装插件什么的..)

    提交到yarn集群的job可以在yarn的管理页面(8088端口)看到。


    yarn框架的运行机制

    yarn只负责资源的分配,然后启动运算框架的主管进程AppMaster(如运算框架是MapReduce时主管进程就是它的MRAppMaster),剩下的工作就不由yarn去做了。

    MapReduce只适合做数据的批量离线处理,而不适用于实时性的需求,要实现实时性要使用的运算框架是spark、storm那些,但都可以放在yarn框架下。yarn和运算框架分离的策略使得hadoop具有广泛的实用性和生命力。


    yarn提交job的流程(关键源码)


    坑点

    org.apache.hadoop.security.AccessControlException

    运行程序后查看output文件夹能看到运行成功了,但是cat查看part-r-00000的时候报错

    error creating legacy BlockReaderLocal. Disabling legacy local reads.
    org.apache.hadoop.security.AccessControlException: Can't continue with getBlockLocalPathInfo() authorization. The user thousfeet is not configured in dfs.block.local-path-access.user

    解决方法是hdfs-site.xml中的配置项dfs.client.read.shortcircuit=false
    woc,这个参数其实原本默认就是false...突然想起这不是上次配置出错的时候病急乱投医加上的吗,果然乱跟教程害死人orzz

    (参考:http://www.51testing.com/html/59/445759-821244.html)

  • 相关阅读:
    [MongoDB] Remove, update, create document
    [AngularJS + RxJS] Search with RxJS
    [Redux] Passing the Store Down with <Provider> from React Redux
    [Redux] Passing the Store Down Implicitly via Context
    [Redux] Passing the Store Down Explicitly via Props
    [Cycle.js] Generalizing run() function for more types of sources
    [Redux] Extracting Container Components -- Complete
    [Redux] Redux: Extracting Container Components -- AddTodo
    视觉暂留:视觉暂留
    人物-发明家-贝尔:亚历山大·贝尔
  • 原文地址:https://www.cnblogs.com/thousfeet/p/8671160.html
Copyright © 2011-2022 走看看