zoukankan      html  css  js  c++  java
  • MapReduce -- TF-IDF

    通过MapReduce实现 TF-IDF值的统计

    数据:文章ID  文件内容

    3823890378201539    今天约了姐妹去逛街吃美食,周末玩得很开心啊!
    ......
    ......

    结果数据:

    3823890378201539    开心:0.28558719539400335    吃:0.21277211221173534    了:0.1159152517783012    美食:0.29174432675350614    去:0.18044286652763497    玩:0.27205714412756765    啊:0.26272169358877784    姐妹:0.3983823545319593    逛街:0.33320559604063593    得很:0.45170136842118586    周末:0.2672478858982343    今天:0.16923426566752778    约:0.0946874743049455
    ......
    ......

    在整个的处理过程中通过两步来完成

    第一步主要生成三种格式的文件

    1、使用分词工具将文章内容进行拆分成多个词条;并记录文章的总词条数 关于分词工具的使用请参考  TF-IDF
    第一步处理后结果:

    今天_3823890378201539    A:1,B:13,
    周末_3823890378201539    A:1,B:13,
    得很_3823890378201539    A:1,B:13,
    约_3823890378201539    B:13,A:1,
    ......

    2、记录词条在多少篇文章中出现过

    处理后结果:

    今天    118
    周末    33311
    ......

    3、记录文章总数

    处理后结果:

    counter    1065

    第二步将文件2,3的内容加载到缓存,利用2,3文件的内容对文件1的内容通过mapreduce进行计算

    针对数据量不是很大的数据可以加载到缓存,如果数据量过大,不考虑这种方式;

    源码

    Step1.java:

      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.fs.FileSystem;
      3 import org.apache.hadoop.fs.Path;
      4 import org.apache.hadoop.io.LongWritable;
      5 import org.apache.hadoop.io.Text;
      6 import org.apache.hadoop.mapreduce.Job;
      7 import org.apache.hadoop.mapreduce.Mapper;
      8 import org.apache.hadoop.mapreduce.Reducer;
      9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     11 import org.wltea.analyzer.core.IKSegmenter;
     12 import org.wltea.analyzer.core.Lexeme;
     13 
     14 import java.io.IOException;
     15 import java.io.StringReader;
     16 import java.util.HashMap;
     17 import java.util.Map;
     18 import java.util.Map.Entry;
     19 
     20 /**
     21  * Created by Edward on 2016/7/21.
     22  */
     23 public class Step1 {
     24 
     25     public static void main(String[] args)
     26     {
     27         //access hdfs's user
     28         //System.setProperty("HADOOP_USER_NAME","root");
     29 
     30         Configuration conf = new Configuration();
     31         conf.set("fs.defaultFS", "hdfs://node1:8020");
     32 
     33         try {
     34             FileSystem fs = FileSystem.get(conf);
     35 
     36             Job job = Job.getInstance(conf);
     37             job.setJarByClass(RunJob.class);
     38             job.setMapperClass(MyMapper.class);
     39             job.setReducerClass(MyReducer.class);
     40             job.setPartitionerClass(FilterPartition.class);
     41 
     42             //需要指定 map out 的 key 和 value
     43             job.setOutputKeyClass(Text.class);
     44             job.setOutputValueClass(Text.class);
     45 
     46             //设置reduce task的数量
     47             job.setNumReduceTasks(4);
     48 
     49             FileInputFormat.addInputPath(job, new Path("/test/tfidf/input"));
     50 
     51             Path path = new Path("/test/tfidf/output");
     52             if(fs.exists(path))//如果目录存在,则删除目录
     53             {
     54                 fs.delete(path,true);
     55             }
     56             FileOutputFormat.setOutputPath(job, path);
     57 
     58             boolean b = job.waitForCompletion(true);
     59             if(b)
     60             {
     61                 System.out.println("OK");
     62             }
     63 
     64         } catch (Exception e) {
     65             e.printStackTrace();
     66         }
     67     }
     68 
     69     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
     70         @Override
     71         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     72             Map<String, Integer> map = new HashMap<String, Integer>();
     73 
     74             String[] str = value.toString().split("	");
     75             StringReader stringReader = new StringReader(str[1]);
     76             IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
     77             Lexeme lexeme = null;
     78             Long count = 0l;
     79             while((lexeme = ikSegmenter.next())!=null) {
     80                 String word = lexeme.getLexemeText();
     81                 if(map.containsKey(word)) {
     82                     map.put(word, map.get(word)+1);
     83                 }
     84                 else{
     85                     map.put(word, 1);
     86                 }
     87                 count++;
     88             }
     89             for(Entry<String, Integer> entry: map.entrySet())
     90             {
     91                 context.write(new Text(entry.getKey()+"_"+str[0]), new Text("A:"+entry.getValue()));//tf词条在此文章中的个数
     92                 context.write(new Text(entry.getKey()+"_"+str[0]), new Text("B:"+count));//此文章中的总词条数
     93                 context.write(new Text(entry.getKey()),new Text("1"));//词条在此文章中出现+1,计算词条在那些文章中出现过
     94             }
     95             context.write(new Text("counter"), new Text(1+""));//文章数累加器
     96         }
     97     }
     98 
     99     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
    100         @Override
    101         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    102 
    103             //计算总文章数
    104             if(key.toString().equals("conter")) {
    105                 long sum = 0l;
    106                 for(Text v :values)
    107                 {
    108                     sum += Long.parseLong(v.toString());
    109                 }
    110                 context.write(key, new Text(sum+""));
    111             }
    112             else{
    113                 if(key.toString().contains("_")) {
    114                     StringBuilder stringBuilder = new StringBuilder();
    115                     for (Text v : values) {
    116                         stringBuilder.append(v.toString());
    117                         stringBuilder.append(",");
    118                     }
    119                     context.write(key, new Text(stringBuilder.toString()));
    120                 }
    121                 else {//计算词条在那些文章中出现过
    122                     long sum = 0l;
    123                     for(Text v :values)
    124                     {
    125                         sum += Long.parseLong(v.toString());
    126                     }
    127                     context.write(key, new Text(sum+""));
    128                 }
    129             }
    130         }
    131     }
    132 }

    FilterPartition.java

     1 import org.apache.hadoop.io.Text;
     2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
     3 
     4 /**
     5  * Created by Edward on 2016/7/22.
     6  */
     7 public class FilterPartition extends HashPartitioner<Text, Text> {
     8 
     9     @Override
    10     public int getPartition(Text key, Text value, int numReduceTasks) {
    11 
    12         if(key.toString().contains("counter"))
    13         {
    14             return numReduceTasks-1;
    15         }
    16 
    17         if(key.toString().contains("_"))
    18         {
    19             return super.getPartition(key, value, numReduceTasks-2);
    20         }
    21         else
    22         {
    23             return numReduceTasks-2;
    24         }
    25     }
    26 }

    Step2.java

      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.fs.FileSystem;
      3 import org.apache.hadoop.fs.Path;
      4 import org.apache.hadoop.io.LongWritable;
      5 import org.apache.hadoop.io.Text;
      6 import org.apache.hadoop.mapreduce.Job;
      7 import org.apache.hadoop.mapreduce.Mapper;
      8 import org.apache.hadoop.mapreduce.Reducer;
      9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     11 
     12 import java.io.BufferedReader;
     13 import java.io.FileReader;
     14 import java.io.IOException;
     15 import java.net.URI;
     16 import java.util.HashMap;
     17 import java.util.Map;
     18 
     19 /**
     20  * Created by Edward on 2016/7/22.
     21  */
     22 public class Step2 {
     23     public static void main(String[] args)
     24     {
     25         //access hdfs's user
     26         //System.setProperty("HADOOP_USER_NAME","root");
     27 
     28         Configuration conf = new Configuration();
     29         conf.set("fs.defaultFS", "hdfs://node1:8020");
     30 
     31         try {
     32             FileSystem fs = FileSystem.get(conf);
     33 
     34             Job job = Job.getInstance(conf);
     35             job.setJarByClass(RunJob.class);
     36             job.setMapperClass(MyMapper.class);
     37             job.setReducerClass(MyReducer.class);
     38 
     39             //需要指定 map out 的 key 和 value
     40             job.setOutputKeyClass(Text.class);
     41             job.setOutputValueClass(Text.class);
     42 
     43             //分布式缓存,每个slave都能读到数据
     44                 //词条在多少文章中出现过
     45             job.addCacheFile(new Path("/test/tfidf/output/part-r-00002").toUri());
     46                 //文章的总数
     47             job.addCacheFile(new Path("/test/tfidf/output/part-r-00003").toUri());
     48 
     49             FileInputFormat.addInputPath(job, new Path("/test/tfidf/output"));
     50 
     51             Path path = new Path("/test/tfidf/output1");
     52             if(fs.exists(path))//如果目录存在,则删除目录
     53             {
     54                 fs.delete(path,true);
     55             }
     56             FileOutputFormat.setOutputPath(job, path);
     57 
     58             boolean b = job.waitForCompletion(true);
     59             if(b)
     60             {
     61                 System.out.println("OK");
     62             }
     63         } catch (Exception e) {
     64             e.printStackTrace();
     65         }
     66     }
     67 
     68 
     69     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
     70 
     71         public static Map<String, Double> dfmap = new HashMap<String, Double>();
     72 
     73         public static Map<String, Double> totalmap = new HashMap<String, Double>();
     74 
     75         @Override
     76         protected void setup(Context context) throws IOException, InterruptedException {
     77             URI[] cacheFiles = context.getCacheFiles();
     78             Path pArtNum = new Path(cacheFiles[0].getPath());
     79             Path pArtTotal = new Path(cacheFiles[1].getPath());
     80 
     81             //加载词条在多少篇文章中出现过
     82             BufferedReader buffer = new BufferedReader(new FileReader(pArtNum.getName()));
     83             String line = null;
     84             while((line = buffer.readLine()) != null){
     85                 String[] str = line.split("	");
     86                 dfmap.put(str[0], Double.parseDouble(str[1]));
     87             }
     88 
     89             //加载文章总数
     90             buffer = new BufferedReader(new FileReader(pArtTotal.getName()));
     91             line = null;
     92             while((line = buffer.readLine()) != null){
     93                 String[] str = line.split("	");
     94                 totalmap.put(str[0], Double.parseDouble(str[1]));
     95             }
     96         }
     97 
     98         @Override
     99         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    100 
    101             String[] strings = value.toString().split("	");
    102             String k = strings[0];
    103 
    104             if(k.contains("counter")) {
    105                 //过滤掉 文章总数
    106             }
    107             else if(k.contains("_")){
    108                 String word = k.split("_")[0];
    109                 String[] info = strings[1].split(",");
    110                 String n=null;
    111                 String num=null;
    112                 if(info[0].contains("A")){
    113                     n = info[0].substring(info[0].indexOf(":")+1);
    114                     num = info[1].substring(info[0].indexOf(":")+1);
    115                 }
    116                 if(info[0].contains("B")){
    117                     num = info[0].substring(info[0].indexOf(":")+1);
    118                     n = info[1].substring(info[0].indexOf(":")+1);
    119                 }
    120                 double result = 0l;
    121 
    122                 result = (Double.parseDouble(n)/Double.parseDouble(num)) * Math.log( totalmap.get("counter")/dfmap.get(word));
    123                 System.out.println("n=" + Double.parseDouble(n));
    124                 System.out.println("num=" + Double.parseDouble(num));
    125                 System.out.println("counter=" + totalmap.get("counter"));
    126                 System.out.println("wordnum=" + dfmap.get(word));
    127                 context.write(new Text(k.split("_")[1]), new Text(word+":"+result));
    128             }
    129             else{
    130                 //过滤掉 词条在多少篇文章中出现过
    131             }
    132         }
    133     }
    134 
    135     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
    136         @Override
    137         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    138 
    139             StringBuilder stringBuilder = new StringBuilder();
    140             for(Text t: values){
    141                 stringBuilder.append(t.toString());
    142                 stringBuilder.append("	");
    143             }
    144             context.write(key, new Text(stringBuilder.toString()) );
    145         }
    146     }
    147 }
  • 相关阅读:
    java设计模式——多例模式
    Java多例模式
    设计模式(四)——多例模式
    IoC是一个很大的概念,可以用不同的方式实现。
    现有的框架实际上使用以下三种基本技术的框架执行服务和部件间的绑定:
    IOC关注服务(或应用程序部件)是如何定义的以及他们应该如何定位他们依赖的其它服务
    IoC最大的好处是什么?
    Java – Top 5 Exception Handling Coding Practices to Avoid
    @Spring MVC请求处理流程
    Spring Bean的生命周期(非常详细)
  • 原文地址:https://www.cnblogs.com/one--way/p/5695875.html
Copyright © 2011-2022 走看看