zoukankan      html  css  js  c++  java
  • Hadoop基础学习(一)分析、编写并执行WordCount词频统计程序

    版权声明:本文为博主原创文章,未经博主同意不得转载。

    https://blog.csdn.net/jiq408694711/article/details/34181439

    前面已经在我的Ubuntu单机上面搭建好了伪分布模式的HBase环境,当中包含了Hadoop的执行环境。

    详见我的这篇博文:http://blog.csdn.net/jiyiqinlovexx/article/details/29208703

    我的目的主要是学习HBase,下一步打算学习的是将HBase作为Hadoop作业的输入和输出。

    可是好像曾经在南大上学时学习的Hadoop都忘记得几乎相同了,所以找到曾经上课做的几个实验:wordCount,PageRank以及InversedIndex。

    发现曾经写的实验报告还是蛮具体的。非常easy看懂。恰好曾经做实验用的也是hadoop0.20的版本号,所以依照我曾经写的实验手冊直接操作,熟悉一下Hadoop了。


    以下是我曾经写的WordCOunt的实验报告:

    一、实验要求:

    实验内容与要求
    1. 在Eclipse环境下编写WordCount程序,统计全部除Stop-Word(如a, an, of, in, on, the, this, that,…)外全部出现次数k次以上的单词计数。最后的结果依照词频从高到低排序输出
    2. 在集群上执行程序,对莎士比亚文集文档数据进行处理
    3. 可自行建立一个Stop-Word列表文件。当中包含部分停词就可以,不须要列出全部停词;參数k作为输入參数动态指定(如k=10)
    4. 实验结果提交:要求书写一个实验报告,当中包含:
    实验设计说明。包含主要设计思路、算法设计、程序和各个类的设计说明
    程序执行和实验结果说明和分析
    性能、扩展性等方面存在的不足和可能的改进之处
    源程序 。执行程序,停词列表文件
    执行结果文件


    二、实验报告:

    Wordcount词频统计实验

    201241星期日

    19:04

    1设计思路

    Map:

    (1)停词存储

    由于停词比較少。所以选择将他们全部存储到内存中,停词不能有反复,还须要高速訪问。所以选择hashset来存储

    (2)map

    对于map传进来的每一行文本。首先用正在表达式将英文标点符号全部题换成空格,然后在循环分析每个单词,假设这个单词不包含在停词集合中。则将其key设为单词本身,值设置为1。并发射出去。


    Reduce:

    reduce中对每个key,将其全部value累加起来。

    假设value不小于某个词频。则将其output出去。

     

    2遇到的问题

    (1)hadoop API问题

    (2)hadoopmap中读取hdfs文件内容

    (3)怎样按词频从高究竟输出;

    解决1

    參考非常多资料。找到正确使用的API,总之感觉hadoop不同版本号之间API非常混乱。

    API方面有两个点花费我非常多时间。一是mapreduce的初始化函数setup。二是向mapreduce传递參数直接通过configuration来进行,有点相似于JSP中的session

    解决2

    開始我在map之外定义一个全局变量,開始的时候将停词文件路径复制给这个全局变量。可是在map里面无法读取这个文件的内容。不知道怎么回事。

    然后我们在网上查了一下,发现有一个分布式缓存文件的类DistributedCache

    主要先是获取停词文件的路径,将其增加到cache中去,DistributedCache.addCacheFile(newPath(args[++i]).toUri(), conf);

    然后在map中用DistributedCache.getLocalCacheFiles(context.getConfiguration());读取该文件路径,这样就能够读取停词文件的内容了。

    解决3

    词频要在reduce完毕之后才干计算出来。也就是说尽管map之后将键值对分发到reduce之前会依照键值进行一个sort的过程,可是我们也无法借助将key

    value掉换的方法一次进行。

    我们小组一起讨论,想到了在第一次mapreduce统计完词频之后再进行一次mapreduce来依照词频对全部键值对排序。统计结果(中间结果)存放到暂时文件夹中。


    在第二次mapreduce的过程中:

    (1)InverseMapper

    在网上查了一下,hadoop本身就有一个将键值对颠倒顺序的了一个mapper。名字叫做InverseMapper,在交换了键值之后。另一个问题。

    (2)setSortComparatorClass

    hadoop中默认对IntWritable类型的key是以升序排列的,我们是要依照降序,所以重写sort过程中进行key值比較所參考的比較类。使用setSortComparatorClass方法设置比較类。

    (3)setNumReduceTasks(1)

    至于reduce部分我们无须指定不论什么reduce。由于不须要做不论什么操作。仅仅须要指定将全部键值对发送到一个reduce就可以。

     

    3执行过程

    首先利用scp命令将停词文件以及wordcount的可执行jar传输到集群的mater01节点上面去。然后使用ssh命令登录到该节点:

     

    然后在hdfs上面我们小组的文件夹下创建一个wordcount文件夹。以及子文件夹input

    然后使用hadoopfs -put命令将停词文件复制到该文件夹下:

    然后以节点上本来就有的/data/shakespear文件夹以下的数据作为输入,将我们小组的/wordcount/output(不存在)作为输入。执行參数为-skip指定听此文件路径,-greater指定要统计的最低词频的单词。来执行wordcount

    wordcount/output文件夹以下生成结果文件part-r-00000。打开之后发现结果和预期全然一致。词频从高到低,最低词频为10,而且在统计之前已经将标点符号去掉。

    能够用hadoop fs -get /wordcount/output/part-r-00000 .命令下载到当前文件夹。

    也能够打开浏览器:http://localhost:50070/dfshealth.jsp。选择Browse the filesystem来直接查看HDFS上面文件的内容。



    在浏览器中打开:http://localhost:50030/jobtracker.jsp,查看工作执行状态以及结果:

     

    4源程序,停词文件,可执行jar文件均參见本文件夹里。


    =====================================================================================================

    注意,我由于如今是在本机上面执行Hadoop作业,而不是像曾经那样在远端master机器上面跑。所以有些地方不一样。

    比方利用scp将wordcount.jar传到master机器上,以及用ssh登陆这些都不须要。

    可是停词文本集合还是要上传到HDFS。还有之前实验莎士比亚文集的数据是老师已经放在HDFS上了,所以不须要我们上传,这些要自己将莎士比亚文集的数据上传到HDFS。命令是:

    hadoop fs -put /shakespeare /data/shakespare


    源代码

    说实话。看着曾经的图片。发现跑起来蛮快的,可是如今单机真心慢。。

    停词文本文件和莎士比亚文集数据有时间上传到百度云盘,这里先把代码贴出来供大家參考。

    /**
     * WordCount
     * jiyq@seg.nju.edu.cn - 季义钦
     * 统计输入文件各个单词出现频率
     * 统计的时候对于“停词”(从文本文件读入)将不參与统计
     * 最后按统计的词频从高究竟输出
     * 
     *  特别主import某个类的时候。确定你是要用哪个包所属的该类
     *  
     * */
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.*;
    
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
            
    public class WordCount {
    	
    	
    	/**
    	 * Map: 将输入的文本数据转换为<word-1>的键值对
    	 * */
    	public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    		
    		String regex = "[.,"!--;:?'\]]"; //remove all punctuation
    		Text word = new Text();
    		final static IntWritable one = new IntWritable(1);
    		HashSet<String> stopWordSet = new HashSet<String>();
    		
    		/**
    		 * 将停词从文件读到hashSet中
    		 * */
    		private void parseStopWordFile(String path){
    			try {
    				String word = null;
    				BufferedReader reader = new BufferedReader(new FileReader(path));
    				while((word = reader.readLine()) != null){
    					stopWordSet.add(word);
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}	
    		}
    		
    		/**
    		 * 完毕map初始化工作
    		 * 主要是读取停词文件
    		 * */
    		public void setup(Context context) {			
    			
    			Path[] patternsFiles = new Path[0];
    			try {
    				patternsFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
    			} catch (IOException e) {
    				e.printStackTrace();
    			}			
    			if(patternsFiles == null){
    				System.out.println("have no stopfile
    ");
    				return;
    			}
    			
    			//read stop-words into HashSet
    			for (Path patternsFile : patternsFiles) {
    				parseStopWordFile(patternsFile.toString());
    			}
    		}  
    		
    		/**
    		 *  map
    		 * */
    		public void map(LongWritable key, Text value, Context context) 
    			throws IOException, InterruptedException {
    			
    			String s = null;
    			String line = value.toString().toLowerCase();
    			line = line.replaceAll(regex, " "); //remove all punctuation
    			
    			//split all words of line
    			StringTokenizer tokenizer = new StringTokenizer(line);
    			while (tokenizer.hasMoreTokens()) {
    				s = tokenizer.nextToken();
    				if(!stopWordSet.contains(s)){
    					word.set(s);
    					context.write(word, one);
    				}				
    			}
    		}
    	}
    	
    	/**
    	 * Reduce: add all word-counts for a key
    	 * */
    	public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    		
    		int min_num = 0;
    		
    		/**
    		 * minimum showing words
    		 * */
    		public void setup(Context context) {
    			min_num = Integer.parseInt(context.getConfiguration().get("min_num"));
    			System.out.println(min_num);
    		}
    		
    		/**
    		 * reduce
    		 * */
    		public void reduce(Text key, Iterable<IntWritable> values, Context context)	
    			throws IOException, InterruptedException {
    			int sum = 0;
    			for (IntWritable val : values) {
    				sum += val.get();
    			}
    			if(sum < min_num) return;
    			context.write(key, new IntWritable(sum));
    		}
    	}
    	
    	/**
    	 * IntWritable comparator
    	 * */
    	private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
            
    	      public int compare(WritableComparable a, WritableComparable b) {
    	    	  return -super.compare(a, b);
    	      }
    	      
    	      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    	          return -super.compare(b1, s1, l1, b2, s2, l2);
    	      }
    	}
    	
    	/**
    	 * main: run two job
    	 * */
    	public static void main(String[] args){
    		
    		boolean exit = false;
    		String skipfile = null; //stop-file path
    		int min_num = 0;
    		String tempDir = "wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE));
    		
    		Configuration conf = new Configuration();
    		
    		//获取停词文件的路径。并放到DistributedCache中
    	    for(int i=0;i<args.length;i++)
    	    {
    			if("-skip".equals(args[i]))
    			{
    				DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
    				System.out.println(args[i]);
    			}			
    		}
    	    
    	    //获取要展示的最小词频
    	    for(int i=0;i<args.length;i++)
    	    {
    			if("-greater".equals(args[i])){
    				min_num = Integer.parseInt(args[++i]);
    				System.out.println(args[i]);
    			}			
    		}
    	    
    		//将最小词频值放到Configuration中共享
    		conf.set("min_num", String.valueOf(min_num));	//set global parameter
    		
    		try{
    			/**
    			 * run first-round to count
    			 * */
    			Job job = new Job(conf, "jiq-wordcountjob-1");
    			job.setJarByClass(WordCount.class);
    			
    			//set format of input-output
    			job.setInputFormatClass(TextInputFormat.class);
    			job.setOutputFormatClass(SequenceFileOutputFormat.class);
    			
    			//set class of output's key-value of MAP
    			job.setOutputKeyClass(Text.class);
    		    job.setOutputValueClass(IntWritable.class);
    		    
    		    //set mapper and reducer
    		    job.setMapperClass(WordCountMap.class);     
    		    job.setReducerClass(WordCountReduce.class);
    		    
    		    //set path of input-output
    		    FileInputFormat.addInputPath(job, new Path(args[0]));
    		    FileOutputFormat.setOutputPath(job, new Path(tempDir));
    		    
    		    
    		    
    		    if(job.waitForCompletion(true)){		    
    			    /**
    			     * run two-round to sort
    			     * */
    			    //Configuration conf2 = new Configuration();
    				Job job2 = new Job(conf, "jiq-wordcountjob-2");
    				job2.setJarByClass(WordCount.class);
    				
    				//set format of input-output
    				job2.setInputFormatClass(SequenceFileInputFormat.class);
    				job2.setOutputFormatClass(TextOutputFormat.class);		
    				
    				//set class of output's key-value
    				job2.setOutputKeyClass(IntWritable.class);
    			    job2.setOutputValueClass(Text.class);
    			    
    			    //set mapper and reducer
    			    //InverseMapper作用是实现map()之后的数据对的key和value交换
    			    //将Reducer的个数限定为1, 终于输出的结果文件就是一个
    				/**
    				* 注意,这里将reduce的数目设置为1个。有非常大的文章。
    				* 由于hadoop无法进行键的全局排序,仅仅能做一个reduce内部
    				* 的本地排序。

    所以我们要想有一个依照键的全局的排序。 * 最直接的方法就是设置reduce仅仅有一个。 */ job2.setMapperClass(InverseMapper.class); job2.setNumReduceTasks(1); //only one reducer //set path of input-output FileInputFormat.addInputPath(job2, new Path(tempDir)); FileOutputFormat.setOutputPath(job2, new Path(args[1])); /** * Hadoop 默认对 IntWritable 按升序排序,而我们须要的是按降序排列。 * 因此我们实现了一个 IntWritableDecreasingComparator 类,  * 并指定使用这个自己定义的 Comparator 类对输出结果中的 key (词频)进行排序 * */ job2.setSortComparatorClass(IntWritableDecreasingComparator.class); exit = job2.waitForCompletion(true); } }catch(Exception e){ e.printStackTrace(); }finally{ try { //delete tempt dir FileSystem.get(conf).deleteOnExit(new Path(tempDir)); if(exit) System.exit(1); System.exit(0); } catch (IOException e) { e.printStackTrace(); } } } }


    若有什么疑问和不吝赐教。欢迎交流。联系邮箱: jiq408694711@163.com  季义钦

    作为兴趣点,眼下本人正在研究HBase和Hadoop


查看全文
  • 相关阅读:
    多维梯度下降
    梯度下降
    三种评价函数
    Gluon sgd
    Gluon.vision的几类数据集
    Gluon Data API
    Gluon 实现 dropout 丢弃法
    AlexNet 分类 FashionMNIST
    LeNet 分类 FashionMNIST
    LeNet
  • 原文地址:https://www.cnblogs.com/ldxsuanfa/p/10801729.html
  • Copyright © 2011-2022 走看看