zoukankan      html  css  js  c++  java
  • 大数据学习(二)-------- MapReduce

    前提已经安装好hadoop的hdfs集群,可以查看

    https://www.cnblogs.com/tree1123/p/10683570.html

    Mapreduce是hadoop的运算框架,可以对hdfs中的数据分开进行计算,先执行很多maptask,在执行reducetask,这个过程中任务的执行需要一个任务调度的平台,就是yarn。

    一、安装YARN集群

    yarn集群中有两个角色:

    主节点:Resource Manager  1台

    从节点:Node Manager   N台

    Resource Manager一般安装在一台专门的机器上

    Node Manager应该与HDFS中的data node重叠在一起

    修改配置文件:yarn-site.xml

    <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>主机名</value>
    </property>
    
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>
    
    <property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>2048</value>
    </property>
    
    <property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>2</value>
    </property>
    

    然后scp到所有机器,修改主节点hadoop的slaves文件,列入要启动nodemanager的机器,配好免密

    然后,就可以用脚本启动yarn集群:

    sbin/start-yarn.sh

    停止:

    sbin/stop-yarn.sh

    页面:http://主节点:8088 看看node manager节点是否识别

    开发一个提交job到yarn的客户端类,mapreduce所有jar和自定义类,打成jar包上传到hadoop集群中的任意一台机器上,运行jar包中的(YARN客户端类

    hadoop jar ......JobSubmitter

    二、开发mapreduce程序
    注意理解分而治之的思想,先进行map:映射,对应,个数不变。 reduce:化简,合并,将一系列数据,化简为一个值。

    主要需要开发:

    map阶段的进、出数据,

    reduce阶段的进、出数据,

    类型都应该是实现了HADOOP序列化框架的类型,如:

    String对应Text

    Integer对应IntWritable

    Long对应LongWritable

    例子wordcount代码:

    WordcountMapper

    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    	
    	@Override
    	protected void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    
    		// 切单词
    		String line = value.toString();
    		String[] words = line.split(" ");
    		for(String word:words){
    			context.write(new Text(word), new IntWritable(1));
    			
    		}
    	}
    }
    

    WordcountReducer

    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    	
    	
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
    	
    		
    		int count = 0;
    		
    		Iterator<IntWritable> iterator = values.iterator();
    		while(iterator.hasNext()){
    			
    			IntWritable value = iterator.next();
    			count += value.get();
    		}
    		
    		context.write(key, new IntWritable(count));
    		
    	}
    
    }
    
    
    
    
    
    
    
    public class JobSubmitter {
    	
    	public static void main(String[] args) throws Exception {
    		
    		// 在代码中设置JVM系统参数,用于给job对象来获取访问HDFS的用户身份
    		System.setProperty("HADOOP_USER_NAME", "root");
    		
    		
    		Configuration conf = new Configuration();
    		// 1、设置job运行时要访问的默认文件系统
    		conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
    		// 2、设置job提交到哪去运行
    		conf.set("mapreduce.framework.name", "yarn");
    		conf.set("yarn.resourcemanager.hostname", "hdp-01");
    		// 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
    		conf.set("mapreduce.app-submission.cross-platform","true");
    		
    		Job job = Job.getInstance(conf);
    		
    		// 1、封装参数:jar包所在的位置
    		job.setJar("d:/wc.jar");
    		//job.setJarByClass(JobSubmitter.class);
    		
    		// 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
    		job.setMapperClass(WordcountMapper.class);
    		job.setReducerClass(WordcountReducer.class);
    		
    		// 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		
    		
    		Path output = new Path("/wordcount/output");
    		FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
    		if(fs.exists(output)){
    			fs.delete(output, true);
    		}
    		
    		// 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
    		FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
    		FileOutputFormat.setOutputPath(job, output);  // 注意:输出路径必须不存在
    		
    		
    		// 5、封装参数:想要启动的reduce task的数量
    		job.setNumReduceTasks(2);
    		
    		// 6、提交job给yarn
    		boolean res = job.waitForCompletion(true);
    		
    		System.exit(res?0:-1);
    		
    	}
    	
    	
    
    }
    

    MR还有一些高级的用法:自定义类型,自定义Partitioner,Combiner,排序,倒排索引,自定义GroupingComparator

    三、mapreduce与yarn的核心机制

    yarn是一个分布式程序的运行调度平台

    yarn中有两大核心角色:

    1、Resource Manager

    接受用户提交的分布式计算程序,并为其划分资源

    管理、监控各个Node Manager上的资源情况,以便于均衡负载

    2、Node Manager

    管理它所在机器的运算资源(cpu + 内存)

    负责接受Resource Manager分配的任务,创建容器、回收资源

    Mapreduce工作机制:

    划分输入切片——》 环形缓冲区 ——》 分区排序 ——》Combiner 局部聚合——》shuffle ——》GroupingComparator——》输出

  • 相关阅读:
    暑假假期周进度报告(第七周)
    暑假假期周进度报告(第五周)
    暑假假期周进度报告(第二周)
    接口 interface 实现接口implements 抽象类 abstract 继承extends
    标准JavaBean
    IO FileInputStream 输入字节流类(读文件) FileOutputStream 输出字节流(写文件) 复制文件
    如何获取任意范围内的一个随机数?
    hadoop 简介
    创建Student数组和Student对象,键盘录入
    云计算01
  • 原文地址:https://www.cnblogs.com/tree1123/p/10711141.html
Copyright © 2011-2022 走看看