zoukankan      html  css  js  c++  java
  • 使用MapReduce运行WordCount案例

    @

    一、准备数据

    注意:准备的数据的格式必须是文本,每个单词之间使用制表符分割。编码必须是utf-8无bom
    在这里插入图片描述

    二、MR的编程规范

    MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可!

    三、编程步骤

    ①Map阶段的核心处理逻辑需要编写在Mapper
    ②Reduce阶段的核心处理逻辑需要编写在Reducer
    ③将编写的Mapper和Reducer进行组合,组合成一个Job
    ④对Job进行设置,设置后运行

    四、编写程序

    WCMapper.java

    public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    	
    	private Text out_key=new Text();
    	private IntWritable out_value=new IntWritable(1);//每个单词出现一次记为1
    	
    	// 针对输入的每个 keyin-valuein调用一次   (0,hello	hi	hello	hi)
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws Exception {
    	
    		System.out.println("keyin:"+key+"----keyout:"+value);
    		
    		String[] words = value.toString().split("	");
    		
    		for (String word : words) {
    			out_key.set(word);
    			//写出数据(单词,1)
    			context.write(out_key, out_value);
    		}
    			
    	}
    }
    

    Mapper程序解读

    1. 导包时,需注意导入 org.apache.hadoop.mapreduce包下的类(hadoop2.0的新api)

    2. 自定义的类必须符合MR的Mapper的规范

    3. 在MR中,只能处理key-value格式的数据
      KEYIN, VALUEIN: mapper输入的k-v类型,由当前Job的InputFormat的RecordReader决定!封装输入的key-value由RecordReader自动进行,不可自定义。
      KEYOUT, VALUEOUT: mapper输出的k-v类型,可自定义

    4. InputFormat的作用:
      ①验证输入目录中的文件格式,是否符合当前Job的要求
      ②生成切片,每个切片都会交给一个MapTask处理
      ③提供RecordReader,由RR从切片中读取记录,交给Mapper进行处理

    方法: List<InputSplit> getSplits: 切片
    RecordReader<K,V> createRecordReader: 创建RecordReader

    默认hadoop使用的是TextInputFormat,TextInputFormat使用LineRecordReader

    1. 在Hadoop中,如果有Reduce阶段。通常key-value都需要实现序列化协议!
      MapTask处理后的key-value,只是一个阶段性的结果!
      这些key-value需要传输到ReduceTask所在的机器!
      将一个对象通过序列化技术,序列化到一个文件中,经过网络传输到另外一台机器,
      再使用反序列化技术,从文件中读取数据,还原为对象是最快捷的方式!

    java的序列化协议: Serializable
    特点:不仅保存对象的属性值,类型,还会保存大量的包的结构,子父类和接口的继承信息,很笨重。
    hadoop开发了一款轻量级的序列化协议: Writable机制!

    序列化是什么

    • 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
    • 反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象

    为什么要序列化

    一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

    WCReducer.java

    /* 
     *KEYIN, VALUEIN: Mapper输出的keyout-valueout
     *KEYOUT, VALUEOUT: 自定义		
     */		
    public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    	
    	private IntWritable out_value=new IntWritable();
    	
    	// reduce一次处理一组数据,key相同的视为一组
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws Exception {
    		
    		int sum=0;
    		
    		for (IntWritable intWritable : values) {
    			sum += intWritable.get();
    		}
    		
    		out_value.set(sum);
    		
    		//将累加的值写出
    		context.write(key, out_value);
    		
    	}
    }
    

    WCDriver.java

    /*
     * 1.启动这个线程,运行Job
     * 
     * 2.本地模式主要用于测试程序是否正确!	
     */
    public class WCDriver {
    	
    	public static void main(String[] args) throws Exception {
    		
    		Path inputPath=new Path("e:/input/wordcount");
    		Path outputPath=new Path("e:/output/wordcount");//保证输出目录不存在
    		
    		//作为整个Job的配置
    		Configuration conf = new Configuration();//空参表示默认使用本地的文件系统
    		
    		//使用HDFS,分布式文件系统
    		/*
    		Path inputPath=new Path("/wordcount");
    		Path outputPath=new Path("/mroutput/wordcount");
    		
    		conf.set("fs.defaultFS", "hdfs://hadoop101:9000");
    		
    		conf.set("mapreduce.framework.name", "yarn");// 在YARN上运行
    		
    		conf.set("yarn.resourcemanager.hostname", "hadoop102");// RM所在的机器
    		*/
    		
    		//一定要保证输出目录不存在
    		FileSystem fs=FileSystem.get(conf);
    		
    		if (fs.exists(outputPath)) {
    			fs.delete(outputPath, true);
    		}
    		
    		// ①创建Job
    		Job job = Job.getInstance(conf);
    
    		//job.setJar("MapReduce-0.0.1-SNAPSHOT.jar");// 告诉NM运行时,MR中Job所在的Jar包在哪里
    		
    		// 将某个类所在地jar包作为job的jar包
    		job.setJarByClass(WCDriver.class);
    			
    		// 为Job创建一个名字
    		job.setJobName("wordcount");
    		
    		// ②设置Job
    		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
    		job.setMapperClass(WCMapper.class);
    		job.setReducerClass(WCReducer.class);
    		
    		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
    		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		// 设置输入目录和输出目录
    		FileInputFormat.setInputPaths(job, inputPath);
    		FileOutputFormat.setOutputPath(job, outputPath);
    		
    		// ③运行Job
    		job.waitForCompletion(true);
    			
    	}
    }
    
    

    注意:
    若要在yarn上运行:

    1. 设置Job所在的Jar包
    Job.setJar("jar名"); //写法一
    Job.setJarByClass(类名);//写法二
    
    1. 然后需将这三个程序打成jar包,放在集群某台机器上,使用hadoop jar命令运行
    hadoop jar jar包名 主类名(WCDriver)
    
  • 相关阅读:
    linux tar 压缩解压缩
    JS获取图片上传地址
    ipython notebook
    docker build lnmp(未完成。。。)
    centos6.7 install chrome
    centos6.6 install
    centos 安装mysqldb 记录
    centos emacs安装
    第三周-第08章节-Python3.5-文件修改详解
    第三周-第06章节-Python3.5-文件读与写详解
  • 原文地址:https://www.cnblogs.com/sunbr/p/13304763.html
Copyright © 2011-2022 走看看