zoukankan      html  css  js  c++  java
  • 文本挖掘之文本聚类(MapReduce)

    刘 勇  Email:lyssym@sina.com

    简介

      针对大数量的文本数据,采用单线程处理时,一方面消耗较长处理时间,另一方面对大量数据的I/O操作也会消耗较长处理时间,同时对内存空间的消耗也是很大,因此,本文引入MapReduce计算模型,以分布式方式处理文本数据,以期提高数据处理速率。本文结合Kmeans和DBSCAN算法,对上述算法进行改进,其中借鉴Kmeans聚类方法(类别个数的确定性)以及DBSCAN聚类方法(基于密度),并在数据处理过程中引入多个Reducer对数据进行归并处理。测试结果表明:在文本个数为457条,迭代次数为50次时,该算法具有可行性;但是在数据规模较小时,其处理速率较单线程处理存在一定的劣势,但是当数据量继续增大(数据量达到一定规模)时,基于分布式的算法,其速率优势会更加明显。

    相关模型

      本文本着以实际工程应用的角度,针对其中涉及到的数学模型简要描述如下,更多内容请参考本系列之前的内容:

      1)余弦相似度

      本文在判别Web 文本数据相似度时,采用余弦定理对其进行判别,即将两篇Web文本先中文分词后进行量化,然后计算上述两个向量的余弦值,进而对其进行判定相似度,其中为了让处理结果更加准确,引入了同义词词林,即加载同义词词典。更多内容在此不再赘述,详细内容见本系列之文本挖掘之文本相似度判定

      2)DBSCAN

      DBSCAN聚类方法涉及两个重要的参数,即e邻域(近似理解为半径)和最少个数minPts(某一个固定值,即密度标识),上述参数表征在某个对象(理解为某一Web文本数据)的e邻域内对象数据个数大于minPts,则该对象为核心对象;然后根据该核心对象的e邻域内某一个对象i,选择对象i的e邻域内核心对象或边缘对象,继而持续递归,从而将上述所有寻找到的数据归为一个聚类集合。DBSCAN的目的在于寻找密度相连对象的最大集合,浅显的解释为,通过A可以找到B,通过B可以找到C,则A、B和C为同一聚类。更多内容在此不再赘述,详细内容见本系列之文本挖掘之文本聚类(DBSCAN)

      3)Kmeans

      Kmeans聚类方法先初始化K(K在聚类前就已确定,需要指出,DBSCAN方法在聚类前的类别个数是无法知道的)个聚类中心(即质心),然后将文本数据与聚类中心比较,与哪个聚类中心更合适(本文以余弦相似度表征)则与该聚类中心为一类,一轮过后则重新计算各个聚类中心(即质心),并进行迭代,直至最终收敛或者达到迭代次数为止。其中在聚类中心计算中引入DBSCAN方法中基于密度的思维,即在某一类中,若某个向量的密度最大,则该节点向量成为新的质心,较其它算法采用距离统计的算法有所创新。更多内容在此不再赘述。

    基于MapReduce的改进算法实现

     

    图-1 基于MapReduce的改进算法框架图

      如图-1所示,为基于MapReduce的改进算法,单次执行框架图。对该框架中部分核心内容解释如下:

      1)  在Mapper端,借鉴Kmeans算法确定K个类别及其初始质心,然后根据该质心,将所有文本进行一次聚类,以相似度与哪个质心相近,则属于质心属于该类别。

      2)在Reducer端,借鉴DBSCAN算法,计算某个所属类别的e领域中所含个数,并以该e领域类所含个数,即minPts个数最多者为新的质心,即密度最大者为新的质心。

      3)在Reducer端,为加快程序访问速率,采用5个Reducer来重新计算类别质心。

      4)在Mapper端,通过读取缓存文件来获取每次迭代所需的类别新质心。

       以下将本次设计中Mapper和Reducer端各自的输入和输出介绍一下:

      Mapper :  <Object,Text>--><IntWritable, Text>

      输入:key未使用, value为Web文本数据;

      输出:key为类别ID,value为Web文本数据。

      Mapper设计的目标:给每篇文本计算出其所属类别,即类别ID。

      Reducer:  <IntWritable, Text>--><NullWritable, Text>

      输入:key为文本类别ID, value为Web文本数据;

      输出:key为Null, value为Web文本数据,即新的质心。

      Reducer设计的目标:给每类数据确定新的质心。

    测试结果与性能分析

      由于本次测试目的,在于判别基于MapReduce的文本聚类算法可行性,因此数据规模并未设置很大。测试数据集为随机从网络上抓取的457篇Web标题,并迭代50次来展开测试,迭代的目的在于使每个类别的质心收敛。

    表-1 改进的Kmeans和DBSCAN文本聚类算法测试结果

      从表-1测试结果可知:在数据规模较小时,单线程处理的速率明显要优于MapReduce。主要原因在于,基于MapReduce框架,其每次迭代需要重新加载词典,同时读/写缓存文件,以获取质心或者修改质心,因此在数据规模较小时,处理数据的时间甚至不及上述文件的I/O时间,因此其优势并未发挥出来。本文作者曾尝试采用Java反射机制,加载数据对象以期解决上述问题,但收效甚微。

      但是,采用MapReduce框架,在计算新的质心时,采用多个Reduer,明显能够改善数据规约的速率,较单线程处理来说,不仅能节省存储空间,同时处理简单、便捷。考虑到后期文本数据规模日益增大的趋势,引入分布式处理框架,对海量文本数据展开处理,已趋于一种潮流趋势,因此本文提出的算法有一定的实践意义。

      程序源代码:

     1 public class ElementDict {
     2     private String term;
     3     private int freq;
     4     
     5     public ElementDict(String term, int freq) {
     6         this.term = term;
     7         this.freq = freq;
     8     }
     9     
    10     
    11     public void setFreq (int freq) {
    12         this.freq = freq;
    13     }
    14 
    15     
    16     public String getTerm() {
    17         return term;
    18     }
    19 
    20     
    21     public int getFreq() {
    22         return freq;
    23     }
    24 
    25     
    26     public boolean equals(ElementDict e) {
    27         boolean ret = false;
    28         if (term.equals(e.getTerm()) && freq == e.getFreq())
    29         {
    30             ret = true;
    31         }
    32         
    33         return ret;
    34     }
    35 }
    Class ElementDict
      1 import java.io.BufferedReader;
      2 import java.io.IOException;
      3 import java.io.InputStreamReader;
      4 import java.net.URI;
      5 import java.util.HashMap;
      6 import java.util.List;
      7 import java.util.ArrayList;
      8 import java.util.Map;
      9 import org.apache.lucene.analysis.TokenStream;
     10 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
     11 import org.wltea.analyzer.lucene.IKAnalyzer;
     12 import org.apache.hadoop.conf.Configuration;
     13 import org.apache.hadoop.fs.FSDataInputStream;
     14 import org.apache.hadoop.fs.FileSystem;
     15 import org.apache.hadoop.fs.Path;
     16 import org.apache.logging.log4j.LogManager;
     17 import org.apache.logging.log4j.Logger;
     18 
     19 public class TextCosine {
     20     private Map<String, String> map= null;
     21     private double common;
     22     private double special;
     23     private static final String PATH = "hdfs://10.1.130.10:9000";
     24     private static Logger logger = LogManager.getLogger(TextCosine.class);
     25     
     26     public TextCosine() {
     27         map = new HashMap<String, String>();
     28         try {
     29             Configuration conf = new Configuration();
     30             FileSystem fs = FileSystem.get(URI.create(PATH), conf);
     31             Path path = new Path("/user/hadoop/doc/synonyms.dict");
     32             FSDataInputStream is = fs.open(path);
     33             BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
     34             String s = null;
     35             while ((s = br.readLine()) != null) {
     36                 String []synonymsEnum = s.split("→");
     37                 map.put(synonymsEnum[0], synonymsEnum[1]);
     38             }
     39             br.close();
     40         } catch (IOException e) {
     41             logger.error("TextCosine IOException!");
     42         }
     43     }
     44     
     45     
     46     public TextCosine(double common, double special) {
     47         map = new HashMap<String, String>();
     48         try {
     49             Configuration conf = new Configuration();
     50             FileSystem fs = FileSystem.get(URI.create(PATH), conf);
     51             Path path = new Path("/user/hadoop/doc/synonyms.dict");
     52             FSDataInputStream is = fs.open(path);
     53             BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
     54             String s = null;
     55             while ((s = br.readLine()) != null) {
     56                 String []synonymsEnum = s.split("→");
     57                 map.put(synonymsEnum[0], synonymsEnum[1]);
     58             }
     59             br.close();
     60         } catch (IOException e) {
     61             logger.error("TextCosine IOException!");
     62         }
     63         
     64         this.common = common;
     65         this.special = special;
     66     }
     67     
     68     
     69     public void setCommon(double common) {
     70         this.common = common;
     71     }
     72     
     73     
     74     public void setSpecial(double special) {
     75         this.special = special;
     76     }
     77     
     78     // get the word with IK Analyzer
     79     public List<ElementDict> tokenizer(String str) {
     80         List<ElementDict> list = new ArrayList<ElementDict>();
     81         IKAnalyzer analyzer = new IKAnalyzer(true);
     82         try {
     83             TokenStream stream = analyzer.tokenStream("", str);
     84             CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
     85             stream.reset();
     86             int index = -1;
     87             while (stream.incrementToken()) {
     88                 if ((index = isContain(cta.toString(), list)) >= 0) {
     89                     list.get(index).setFreq(list.get(index).getFreq() + 1);
     90                 }
     91                 else {
     92                     list.add(new ElementDict(cta.toString(), 1));
     93                 }
     94             }
     95             analyzer.close();
     96         } catch (IOException e) {
     97             e.printStackTrace();
     98         } 
     99         return list;
    100     }
    101     
    102     // assert one term is in the List
    103     public int isContain(String str, List<ElementDict> list) {
    104         for (ElementDict ed : list) {
    105             if (ed.getTerm().equals(str)) {
    106                 return list.indexOf(ed);
    107             } else if (map.get(ed.getTerm())!= null && map.get(ed.getTerm()).equals(str)) {
    108                 return list.indexOf(ed);
    109             }
    110         }
    111         return -1;
    112     }
    113     
    114     // merge the two List to align the vector
    115     public List<String> mergeTerms(List<ElementDict> list1, List<ElementDict> list2) {
    116         List<String> list = new ArrayList<String>();
    117         for (ElementDict ed : list1) {
    118             if (!list.contains(ed.getTerm())) {
    119                 list.add(ed.getTerm());
    120             } else if (!list.contains(map.get(ed.getTerm()))) {
    121                 list.add(ed.getTerm());
    122             }
    123         }
    124         
    125         for (ElementDict ed : list2) {
    126             if (!list.contains(ed.getTerm())) {
    127                 list.add(ed.getTerm());
    128             } else if (!list.contains(map.get(ed.getTerm()))) {
    129                 list.add(ed.getTerm());
    130             }
    131         }
    132         return list;
    133     }
    134     
    135     // get the max cosine 
    136     public double analysisText(List<ElementDict> list1, List<ElementDict> list2) {
    137         int len1 = list1.size();
    138         int len2 = list2.size();
    139         double ret = 0;
    140         if (len2 >= len1 * 1.5) {
    141             List<ElementDict> newList = new ArrayList<ElementDict>();
    142             for (int i = 0; i + len1 <= len2; i++) {
    143                 for (int j = 0; j < len1; j++) 
    144                     newList.add(list2.get(i+j));
    145                 
    146                 newList = adjustList(newList, list2, len2, len1, i);
    147                 double tmp = analysis(list1, newList);
    148                 if (tmp > ret) 
    149                     ret = tmp;
    150             }
    151         } else if (len1 >= len2 * 1.5) {
    152             List<ElementDict> newList = new ArrayList<ElementDict>();
    153             for (int i = 0; i + len2 <= len1; i++) {
    154                 for (int j = 0; j < len2; j++)
    155                     newList.add(list1.get(i+j));
    156                 
    157                 newList = adjustList(newList, list1, len1, len2, i);
    158                 double tmp = analysis(list1, newList);
    159                 if (tmp > ret) 
    160                     ret = tmp;
    161             }
    162         } else {
    163             ret = analysis(list1, list2);
    164         }
    165         return ret;
    166     }
    167     
    168     // adjust the new List with the length about the original List 
    169     public List<ElementDict> adjustList(List<ElementDict> newList, List<ElementDict> list, int lenBig, int lenSmall, int index) {
    170         int gap = lenBig -lenSmall;
    171         int size = (gap/2 > 2) ? 2: gap/2;
    172         if (index < gap/2) {
    173             for (int i = 0; i < size; i++) {
    174                 newList.add(list.get(lenSmall+index+i));
    175             }
    176         } else {
    177             for (int i = 0; i > size; i++) {
    178                 newList.add(list.get(lenBig-index-i));
    179             }
    180         }
    181         return newList;
    182     }
    183     
    184     // analysis the cosine for two vectors
    185     public double analysis(List<ElementDict> list1, List<ElementDict> list2) {
    186         List<String> list = mergeTerms(list1, list2);
    187         List<Integer> weightList1 = assignWeight(list, list1);
    188         List<Integer> weightList2 = assignWeight(list, list2);
    189         return countCosSimilarity(weightList1, weightList2);
    190     }
    191     
    192     // according the frequency to assign the weight
    193     public List<Integer> assignWeight(List<String> list, List<ElementDict> list1) {
    194         List<Integer> vecList = new ArrayList<Integer>(list.size());
    195         boolean isEqual = false;
    196         for (String str : list) {
    197             for (ElementDict ed : list1) {
    198                 if (ed.getTerm().equals(str)) {
    199                     isEqual = true;
    200                     vecList.add(new Integer(ed.getFreq()));
    201                 } else if (map.get(ed.getTerm())!= null && map.get(ed.getTerm()).equals(str)) {
    202                     isEqual = true;
    203                     vecList.add(new Integer(ed.getFreq()));
    204                 }
    205             }
    206             
    207             if (!isEqual) {
    208                 vecList.add(new Integer(0));
    209             }
    210             isEqual = false;
    211         }
    212         return vecList;
    213     }
    214     
    215     // count the cosine about the two vectors
    216     public double countCosSimilarity(List<Integer> list1, List<Integer> list2) {
    217         double countScores = 0;
    218         int element = 0;
    219         int denominator1 = 0;
    220         int denominator2 = 0;
    221         int index = -1;
    222         for (Integer it : list1) {
    223             index ++;
    224             int left = it.intValue();
    225             int right = list2.get(index).intValue();
    226             element += left * right;
    227             denominator1 += left * left;
    228             denominator2 += right * right;
    229         }
    230         try {
    231             countScores = (double)element / Math.sqrt(denominator1 * denominator2);
    232         } catch (ArithmeticException e) {
    233             e.printStackTrace();
    234         }
    235         return countScores;
    236     }
    237     
    238     
    239     public boolean isSimilarity(double param, double score) {
    240         boolean ret = false;
    241         if (score >= param)
    242             ret = true;
    243         return ret;
    244     }
    245     
    246     
    247     public boolean assertSimilarity(List<ElementDict> list1, List<ElementDict> list2) 
    248     {
    249         int len1 = list1.size();
    250         int len2 = list2.size();
    251         if (len2 >= len1 * 1.5) {
    252             List<ElementDict> newList = new ArrayList<ElementDict>();
    253             for (int i = 0; i + len1 <= len2; i++) {
    254                 for (int j = 0; j < len1; j++) 
    255                     newList.add(list2.get(i+j));
    256                 
    257                 newList = adjustList(newList, list2, len2, len1, i);
    258                 if (isSimilarity(special, analysis(list1, newList))) 
    259                     return true;
    260             }
    261         } else if (len1 >= len2 * 1.5) {
    262             List<ElementDict> newList = new ArrayList<ElementDict>();
    263             for (int i = 0; i + len2 <= len1; i++) {
    264                 for (int j = 0; j < len2; j++)
    265                     newList.add(list1.get(i+j));
    266                 
    267                 newList = adjustList(newList, list1, len1, len2, i);
    268                 if (isSimilarity(special, analysis(list1, newList))) 
    269                     return true;
    270             }
    271         } else {
    272             if (isSimilarity(common, analysis(list1, list2))) 
    273                 return true;
    274         }
    275         return false;
    276     }
    277 }
    Class TextCosine
     1 import java.util.Collections;
     2 import java.util.List;
     3 
     4 import org.apache.logging.log4j.LogManager;
     5 import org.apache.logging.log4j.Logger;
     6 
     7 import com.gta.cosine.TextCosine;
     8 import com.gta.cosine.ElementDict;
     9 
    10 public class DensityCenter {
    11     private Logger logger = LogManager.getLogger(DensityCenter.class);
    12     private double eps;
    13     private TextCosine cosine;
    14     
    15     public DensityCenter(double eps, TextCosine cosine) {
    16         this.eps = eps;
    17         this.cosine = cosine;
    18     }
    19     
    20     
    21     public double cosineDistance(String src, String dst) 
    22     {
    23         List<ElementDict> vec1 = cosine.tokenizer(src);
    24         List<ElementDict> vec2 = cosine.tokenizer(dst);
    25         return cosine.analysisText(vec1, vec2);
    26     }
    27     
    28     
    29     public int getNeighbors(String src, List<String> dst) {
    30         int ret = 0;
    31         double score = 0;
    32         for (String s : dst) {
    33             score = cosineDistance(src, s);
    34             if (score >= eps)
    35                 ret++;
    36         }
    37         return ret;
    38     }
    39     
    40     
    41     public String getDensityCenter(List<String> text) {
    42         int max = 0;
    43         int i = 0;
    44         int index = 0;
    45         for (String s : text) {
    46             int ret = getNeighbors(s, text);
    47             if (ret > max) {
    48                 index = i;
    49                 max = ret;
    50             }
    51             i++;
    52         }
    53         return text.get(index);
    54     }
    55     
    56     
    57     public boolean compareCenters(List<String> oldCenters, List<String> newCenters) 
    58     {
    59         boolean ret = false;
    60         Collections.sort(oldCenters);
    61         Collections.sort(newCenters);
    62         int oldSize = oldCenters.size();
    63         int newSize = newCenters.size();
    64         logger.info("oldSize : " + oldSize);
    65         logger.info("newSize : " + newSize);
    66         int size = oldSize > newSize ? newSize : oldSize;
    67         int index = 0;
    68         int count = 0;
    69         for (String s : oldCenters) {
    70             if (s.equals(newCenters.get(index))) 
    71                 count++;
    72             
    73             index++;
    74             if (index >= size)  // Avoid the size of two List is not the same
    75                 break;
    76         }
    77         logger.info("count : " + count);
    78         if (count == index)
    79             ret = true;
    80         
    81         return ret;
    82     }
    83 }
    Class DensityCenter
      1 import java.io.BufferedReader;
      2 import java.io.InputStreamReader;
      3 import java.io.IOException;
      4 import java.net.URI;
      5 import java.util.ArrayList;
      6 import java.util.List;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.FSDataInputStream;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.IntWritable;
     11 import org.apache.hadoop.io.NullWritable;
     12 import org.apache.hadoop.io.Text;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.logging.log4j.LogManager;
     16 import org.apache.logging.log4j.Logger;
     17 import com.gta.cosine.TextCosine;
     18 import com.gta.cosine.ElementDict;
     19 import com.gta.util.DensityCenter;
     20 
     21 public class KMeansProcess {
     22     
     23     public static class TextMapper extends Mapper<Object, Text, IntWritable, Text> {
     24         private static Logger logger = LogManager.getLogger(TextMapper.class);
     25         public static List<String> centersList = new ArrayList<String>();
     26         public static TextCosine cosine = new TextCosine();
     27         
     28         public void setup(Context context)
     29         {
     30             int iteration = context.getConfiguration().getInt("ITERATION", 100);
     31             if (iteration == 0) {
     32                 int task = context.getConfiguration().getInt("TASK", 0);
     33                 try {
     34                     URI[] caches = context.getCacheFiles();
     35                     if (caches == null || caches.length <= 0) {
     36                         System.exit(1);
     37                     }
     38                     for (int i = 0; i < task; i++) {
     39                         FileSystem fs = FileSystem.get(caches[i], context.getConfiguration());
     40                         FSDataInputStream is = fs.open(new Path(caches[i].toString()));
     41                         BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
     42                         String s = null;
     43                         while ((s = br.readLine()) != null) {
     44                             centersList.add(s);    
     45                         }
     46                         br.close();
     47                     }
     48                 } catch (IOException e) {
     49                     logger.error(e.getMessage());
     50                 }
     51             }
     52         }
     53         
     54         
     55         public void map(Object key, Text value, Context context)  
     56         {
     57             try {
     58                 String str = value.toString();
     59                 double score = 0;
     60                 double countTmp = 0;
     61                 int clusterID = 0;
     62                 int index = 0;
     63                 List<ElementDict> vec1 = cosine.tokenizer(str);
     64                 for (String s : centersList) {
     65                     List<ElementDict> vec2 = cosine.tokenizer(s);
     66                     countTmp = cosine.analysisText(vec1, vec2);
     67                     if (countTmp > score) {
     68                         clusterID = index;
     69                         score = countTmp;
     70                     }
     71                     index++;
     72                 }
     73                 context.write(new IntWritable(clusterID), new Text(str));
     74             } catch (IOException e) {
     75                 logger.error(e.getMessage());
     76             } catch (InterruptedException e) {
     77                 logger.error(e.getMessage());
     78             }
     79         }
     80     }
     81     
     82     
     83     public static class TextReducer extends Reducer<IntWritable, Text, NullWritable, Text> {
     84         private static Logger logger = LogManager.getLogger(TextReducer.class);
     85         public static DensityCenter center = new DensityCenter(0.75, KMeansProcess.TextMapper.cosine);
     86         
     87         public void reduce(IntWritable key, Iterable<Text> values, Context context) {
     88             try {
     89                 List<String> list = new ArrayList<String>();
     90                 for (Text val : values) {
     91                     list.add(val.toString());
     92                 }
     93                 context.write(NullWritable.get(), new Text(center.getDensityCenter(list)));
     94             } catch (IOException e) {
     95                 logger.error(e.getMessage());
     96             } catch (InterruptedException e) {
     97                 logger.error(e.getMessage());
     98             }
     99         }
    100     }
    101 }
    Class KMeansProcess
     1 import java.io.BufferedReader;
     2 import java.io.IOException;
     3 import java.io.InputStreamReader;
     4 import java.net.URI;
     5 import java.util.List;
     6 import org.apache.hadoop.fs.FSDataInputStream;
     7 import org.apache.hadoop.fs.FileSystem;
     8 import org.apache.hadoop.fs.Path;
     9 import org.apache.hadoop.io.IntWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.logging.log4j.LogManager;
    14 import org.apache.logging.log4j.Logger;
    15 import com.gta.cosine.TextCosine;
    16 import com.gta.cosine.ElementDict;
    17 
    18 public class KMeans {
    19         
    20     public static class KMeansMapper extends Mapper<Object, Text, IntWritable, Text> {
    21         private List<String> centersList = KMeansProcess.TextMapper.centersList;
    22         private static Logger logger = LogManager.getLogger(KMeans.KMeansMapper.class);
    23         private TextCosine cosine = KMeansProcess.TextMapper.cosine;
    24         
    25         public void setup(Context context)
    26         {
    27             int task = context.getConfiguration().getInt("TASK", 0);
    28             try {
    29                 URI[] caches = context.getCacheFiles();
    30                 if (caches == null || caches.length <= 0) {
    31                     System.exit(1);
    32                 }
    33                 for (int i = 0; i < task; i++) {
    34                     FileSystem fs = FileSystem.get(caches[i], context.getConfiguration());
    35                     FSDataInputStream is = fs.open(new Path(caches[i].toString()));
    36                     BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    37                     String s = null;
    38                     while ((s = br.readLine()) != null)
    39                         centersList.add(s);
    40                     br.close();
    41                 }
    42             } catch (IOException e) {
    43                 logger.error(e.getMessage());
    44             }
    45         }
    46         
    47         
    48         public void map(Object key, Text value, Context context) {
    49             try {
    50                 String str = value.toString();
    51                 double score = 0;
    52                 double countTmp = 0;
    53                 int clusterID = 0;
    54                 int index = 0;
    55                 List<ElementDict> vec1 = cosine.tokenizer(str);
    56                 for (String s : centersList) {
    57                     List<ElementDict> vec2 = cosine.tokenizer(s);
    58                     countTmp = cosine.analysisText(vec1, vec2);
    59                     if (countTmp > score) {
    60                         clusterID = index;
    61                         score = countTmp;
    62                     }
    63                     index++;
    64                 }
    65                 context.write(new IntWritable(clusterID), new Text(str));
    66             } catch (IOException e) {
    67                 logger.error(e.getMessage());
    68             } catch (InterruptedException e) {
    69                 logger.error(e.getMessage());
    70             }
    71         }
    72         
    73         
    74         public void cleanup(Context context)
    75         {
    76             centersList.clear();
    77         }
    78     }
    79     
    80     
    81     public static class KMeansReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    82         private static Logger logger = LogManager.getLogger(KMeans.KMeansReducer.class);
    83         
    84         public void ruduce(IntWritable key, Iterable<Text> values, Context context) {
    85             try {
    86                 for (Text val : values) {
    87                     context.write(key, val);
    88                 }
    89             } catch (IOException e) {
    90                 logger.error(e.getMessage());
    91             } catch (InterruptedException e) {
    92                 logger.error(e.getMessage());
    93             }
    94         }
    95     }
    96 
    97 }
    Class KMeans
      1 import java.io.BufferedReader;
      2 import java.io.IOException;
      3 import java.io.InputStreamReader;
      4 import java.util.List;
      5 import java.util.ArrayList;
      6 import java.net.URI;
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FSDataInputStream;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import org.apache.hadoop.fs.Path;
     11 import org.apache.hadoop.io.IntWritable;
     12 import org.apache.hadoop.io.Text;
     13 import org.apache.hadoop.mapreduce.Job;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
     16 import com.gta.cluster.KMeans.KMeansMapper;
     17 import com.gta.cluster.KMeans.KMeansReducer;
     18 import com.gta.cluster.KMeansProcess.TextMapper;
     19 import com.gta.cluster.KMeansProcess.TextReducer;
     20 import org.apache.logging.log4j.LogManager;
     21 import org.apache.logging.log4j.Logger;
     22 
     23 public class Cluster {
     24     public static final int MAX = 50;
     25     public static final String INPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/input/";
     26     public static final String OUTPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/output/";
     27     public static final String TMP_PATH = "hdfs://10.1.130.10:9000/user/hadoop/tmp/";
     28     public static final int TASK = 5;
     29     public static Logger logger = LogManager.getLogger(Cluster.class);
     30     private Configuration conf;
     31     private int iteration = 0;
     32     
     33     public Cluster() 
     34     {
     35         this.conf = new Configuration();
     36         conf.setInt("TASK", TASK);
     37     }
     38     
     39     
     40     public void run() throws IOException, InterruptedException, ClassNotFoundException
     41     {
     42         while (iteration < MAX) {
     43             logger.info("次数   :  " + (iteration+1));
     44             conf.setInt("ITERATION", iteration);
     45             Job job = Job.getInstance(conf, "KMeans Process");
     46             if (iteration == 0) {
     47                 String cacheFile = TMP_PATH + iteration + "/part-r-0000";
     48                 for (int i = 0; i < TASK; i++)
     49                     job.addCacheFile(URI.create(cacheFile+i));
     50             }
     51             job.setJarByClass(KMeansProcess.class);
     52             job.setMapperClass(TextMapper.class);
     53             job.setNumReduceTasks(TASK);
     54             job.setReducerClass(TextReducer.class);
     55             job.setOutputKeyClass(IntWritable.class);
     56             job.setOutputValueClass(Text.class);
     57             iteration++;
     58             String outFile = TMP_PATH + iteration;
     59             FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
     60             FileOutputFormat.setOutputPath(job, new Path(outFile));
     61             job.waitForCompletion(true);
     62             conf.unset("ITERATION");
     63             List<String> tmpList = getCenterList(outFile);
     64             if (KMeansProcess.TextReducer.center.compareCenters(KMeansProcess.TextMapper.centersList, tmpList)) 
     65                 break;
     66             else {
     67                 KMeansProcess.TextMapper.centersList.clear();
     68                 for (String s : tmpList) {
     69                     KMeansProcess.TextMapper.centersList.add(s);
     70                 }
     71             }
     72         }
     73     }
     74     
     75     
     76     public void lastRun() throws IOException, InterruptedException, ClassNotFoundException
     77     {
     78         String cacheFile = TMP_PATH + iteration + "/part-r-0000";
     79         Job job = Job.getInstance(conf, "KMeans");
     80         for (int i = 0; i < TASK; i++)
     81             job.addCacheFile(URI.create(cacheFile+i));
     82         job.setJarByClass(KMeans.class);
     83         job.setMapperClass(KMeansMapper.class);
     84         job.setReducerClass(KMeansReducer.class);
     85         job.setOutputKeyClass(IntWritable.class);
     86         job.setOutputValueClass(Text.class);    
     87         FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
     88         FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
     89         job.waitForCompletion(true);
     90     }
     91     
     92     
     93     public List<String> getCenterList(String outFile) 
     94     {
     95         List<String> centerList = new ArrayList<String>();
     96         String fileName = outFile + "/part-r-0000";
     97         try {
     98             for (int i = 0; i < TASK; i++) {
     99                 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
    100                 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
    101                 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    102                 String s = null;
    103                 while ((s = br.readLine()) != null)
    104                     centerList.add(s);
    105                 br.close();
    106             }
    107         } catch (IOException e) {
    108             logger.info(e.getMessage());
    109         }
    110         
    111         return centerList;
    112     }
    113         
    114     
    115     public static void main(String[] args) {
    116         Cluster cluster = new Cluster();     
    117         try {
    118             long start = System.currentTimeMillis();
    119             cluster.run();
    120             cluster.lastRun();
    121             long end = System.currentTimeMillis();
    122             Cluster.logger.info(end-start);
    123         } catch (ClassNotFoundException e) {
    124             e.printStackTrace();
    125         } catch (IOException e) {
    126             e.printStackTrace();
    127         } catch (InterruptedException e) {
    128             e.printStackTrace();
    129         }                
    130     }
    131 }
    Class Cluster

       鉴于在分布式环境下,多次迭代需要多次读取缓存文件,因此本文引入静态变量,以减少对TextCosine等初始化,以达到提升文本处理速率的目的。本文作者一直试图将对象实体传入Job中,但是经过多次实践,均以失败告终,若是有更好的解决方案,请联系我

     

     


      作者:志青云集
      出处:http://www.cnblogs.com/lyssym
      如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
      如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
      如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【志青云集】。
      本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。


     

  • 相关阅读:
    ROS和H3C,华为的端口汇聚方式
    wifi6 802.11ax技术标准 值得期待但无需等待!
    免信用卡更改Apple ID地区
    pip install失败报错解决方案
    黑产-起底身份倒卖产业:那些被公开叫卖的人生
    黄金
    jupyter nootbook本地使用指南
    current account(经常账户)
    outlier异常值检验原理和处理方法
    随机逻辑回归random logistic regression-特征筛选
  • 原文地址:https://www.cnblogs.com/lyssym/p/4954023.html
Copyright © 2011-2022 走看看