zoukankan      html  css  js  c++  java
  • MaperReduce实验

    MaperReduce实现WordCount程序二次排序

    前期准备

    • 启动Zookeeper

    ./zkServer.sh start
    
    • 启动HDFS

    start-dfs.sh 
    
    • 启动Yarn

    start-yarn.sh
    
    • 将要处理的数据文件上传到HDFS

      ##这里数据文件名为wordcount.txt,目标存放路径为hdfs:/wc/srcdata/
      ##在HDFS根目录下创建wc目录
      hadoop fs -mkdir /wc 
      
      ##创建srcdata目录
      hadoop fs -mkdir /wc/srcdata
      
      ##上传wordcount.txt
      hadoop fs -put wordcount.txt /wc/srcdata
      
      ##不需要创建输出目录,否则会报错
      
    

    1. 工程结构

    • 导入common核心包及其依赖包
    • 导入mapreduce包及其依赖包
    • 导入tools包及其依赖

    2. 编写自定义NewKey类

    package cn.hadoop.mr.wordcount;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    /*
     * newKey是自定义的数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制
     * 就必须实现hadoop相应的序列化接口
     */
    
    public class NewKey implements WritableComparable<NewKey>{
    	
    	private String word;
    	
    	
    	//反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
    	public NewKey() {}
    	
    	
    
    	//初始化对象
    	public NewKey(String word) {
    		this.word = word;
    	}
    
    	public String getWord() {
    		return word;
    	}
    
    	public void setWord(String word) {
    		this.word = word;
    	}
    
    	//从数据流中反序列化出对象的数据
    	//从数据流中读出对象字段时,必须与序列化时的顺序一致
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		
    				word = in.readUTF();
    	}
    
    	//将对象数据序列化到流中
    	@Override
    	public void write(DataOutput out) throws IOException {
    		
    		out.writeUTF(word);		
    	}
    	
    	@Override
    	public String toString() {
    		
    		return word;
    	}
    
    	//实现倒序排序
    	@Override
    	public int compareTo(NewKey o) {
    		return o.getWord().compareToIgnoreCase(word);
    	}
    	
    	
    
    }
    
    

    3. 编写WCMapper类

    package cn.hadoop.mr.wordcount;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    //四个泛型中,前两个时指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUE是输入的value的类型
    //map和reduce的数据输入输出都是以key-value对的形式封装的
    //默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
    //mapreduce框架将Long类型封装为可序列化的LongWriteble类型,String封装为Text
    public class WCMapper extends Mapper<LongWritable, Text, NewKey, LongWritable>{
    	
    	
    	//mapreduce框架每读一行数据就调用一次该方法
    	@Override
    	protected void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    		
    		//具体的业务处理逻辑在该方法中编写,业务处理所需的数据由框架进行传递,在方法的参数中体现key-value
    		//key是这一行数据的起始偏移量,value是这一行的文本内容
    		
    		//将这一行内容转化为string类型
    		String line=value.toString();
    		
    		//调用Hadoop工具类对这一行文本按特定分隔符切分
    		String[] words = StringUtils.split(line," ");
    		
    		//遍历单词数组输出到context中,输出为key-value形式,key为单词,value为1
    		for(String word:words) {
    			context.write(new NewKey(word), new LongWritable(1));
    		}
    		
    		
    	}
    }
    
    

    4. 编写WCReduer类

    package cn.hadoop.mr.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WCReucer extends Reducer<NewKey, LongWritable, NewKey, LongWritable>{
    	
    	
    	//框架在map处理完成之后,缓存所有k-v对,根据k进行分组(相同k为同一组)后传递<key,value{}>,对每一组k调用一次reduce方法
    	//<hello,{1,1,1,1....}>
    	@Override
    	protected void reduce(NewKey key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
    		
    		long count =0;
    		//遍历values进行累加求和
    		for(LongWritable value : values) {
    			
    			count += value.get();
    		}
    		
    		//输出这一个单词的统计结果
    		
    		context.write(key, new LongWritable(count));
    	}
    
    	
    
    }
    
    

    5. 编写作业描述类

    package cn.hadoop.mr.wordcount;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * 用来描述一个特定的作业
     * 例如:
     * 指定该作业所使用的map类和reduce类;
     * 指定作业所需输入数据的存放路径;
     * 指定作业输出结果存放路径
     * @author Administrator
     *
     */
    public class WCRunner extends Configured implements Tool{
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    
    		//配置文件
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		
    		//设置整个job所调用类的jar包路径
    		job.setJarByClass(WCRunner.class);
    		
    		//设置该作业所使用的mapper和reducer类
    		job.setMapperClass(WCMapper.class);
    		job.setReducerClass(WCReucer.class);
    		
    		//指定mapper输出数据的k-v类型,和reduce输出类型一样,可缺省
    		job.setMapOutputKeyClass(NewKey.class);
    		job.setOutputValueClass(LongWritable.class);
    		
    		//指定reduce输出数据的k-v类型
    		job.setOutputKeyClass(NewKey.class);
    		job.setOutputValueClass(LongWritable.class);
    		
    		//指定输入数据存放路径
    		FileInputFormat.setInputPaths(job, new Path(arg0[0]));
    		
    		//指定输出数据存放路径
    		FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
    		
    		//将job提交给集群运行,参数为true表示提示运行进度
    		return job.waitForCompletion(true)?0:1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		
    		int res = ToolRunner.run(new Configuration(), new WCRunner(), args);
    		
    		System.exit(res);
    	}
    
    }
    
    

    6. 将工程打包

    注:工程所使用的JDK版本必须和Hadoop所使用的JDK版本一致

    ``

    8. 查看输出结果

    ##查看指定的输出路径是否生成文件
    hadoop fs -ls /wc/output
    
    ##查看运行结果
    
    hadoop fs -cat /wc/output/part-r-00000
    
    

    结果如图所示

  • 相关阅读:
    7、8月刷题总结
    【POJ】2828 Buy Tickets(线段树+特殊的技巧/splay)
    [LeetCode] 459. Repeated Substring Pattern 重复子字符串模式
    [LeetCode] 268. Missing Number 缺失的数字
    [LeetCode] 190. Reverse Bits 翻转二进制位
    [LeetCode] 275. H-Index II H指数 II
    [LeetCode] 274. H-Index H指数
    [LeetCode] 387. First Unique Character in a String 字符串的第一个唯一字符
    [LeetCode] 415. Add Strings 字符串相加
    [LeetCode] 220. Contains Duplicate III 包含重复元素 III
  • 原文地址:https://www.cnblogs.com/liminghuang/p/9075208.html
Copyright © 2011-2022 走看看