zoukankan      html  css  js  c++  java
  • 文本挖掘之文本聚类(MapReduce)

    刘 勇  Email:lyssym@sina.com






      本文在判别Web 文本数据相似度时,采用余弦定理对其进行判别,即将两篇Web文本先中文分词后进行量化,然后计算上述两个向量的余弦值,进而对其进行判定相似度,其中为了让处理结果更加准确,引入了同义词词林,即加载同义词词典。更多内容在此不再赘述,详细内容见本系列之文本挖掘之文本相似度判定







    图-1 基于MapReduce的改进算法框架图


      1)  在Mapper端,借鉴Kmeans算法确定K个类别及其初始质心,然后根据该质心,将所有文本进行一次聚类,以相似度与哪个质心相近,则属于质心属于该类别。





      Mapper :  <Object,Text>--><IntWritable, Text>

      输入:key未使用, value为Web文本数据;



      Reducer:  <IntWritable, Text>--><NullWritable, Text>

      输入:key为文本类别ID, value为Web文本数据;

      输出:key为Null, value为Web文本数据,即新的质心。




    表-1 改进的Kmeans和DBSCAN文本聚类算法测试结果




     1 public class ElementDict {
     2     private String term;
     3     private int freq;
     5     public ElementDict(String term, int freq) {
     6         this.term = term;
     7         this.freq = freq;
     8     }
    11     public void setFreq (int freq) {
    12         this.freq = freq;
    13     }
    16     public String getTerm() {
    17         return term;
    18     }
    21     public int getFreq() {
    22         return freq;
    23     }
    26     public boolean equals(ElementDict e) {
    27         boolean ret = false;
    28         if (term.equals(e.getTerm()) && freq == e.getFreq())
    29         {
    30             ret = true;
    31         }
    33         return ret;
    34     }
    35 }
    Class ElementDict
      1 import java.io.BufferedReader;
      2 import java.io.IOException;
      3 import java.io.InputStreamReader;
      4 import java.net.URI;
      5 import java.util.HashMap;
      6 import java.util.List;
      7 import java.util.ArrayList;
      8 import java.util.Map;
      9 import org.apache.lucene.analysis.TokenStream;
     10 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
     11 import org.wltea.analyzer.lucene.IKAnalyzer;
     12 import org.apache.hadoop.conf.Configuration;
     13 import org.apache.hadoop.fs.FSDataInputStream;
     14 import org.apache.hadoop.fs.FileSystem;
     15 import org.apache.hadoop.fs.Path;
     16 import org.apache.logging.log4j.LogManager;
     17 import org.apache.logging.log4j.Logger;
     19 public class TextCosine {
     20     private Map<String, String> map= null;
     21     private double common;
     22     private double special;
     23     private static final String PATH = "hdfs://";
     24     private static Logger logger = LogManager.getLogger(TextCosine.class);
     26     public TextCosine() {
     27         map = new HashMap<String, String>();
     28         try {
     29             Configuration conf = new Configuration();
     30             FileSystem fs = FileSystem.get(URI.create(PATH), conf);
     31             Path path = new Path("/user/hadoop/doc/synonyms.dict");
     32             FSDataInputStream is = fs.open(path);
     33             BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
     34             String s = null;
     35             while ((s = br.readLine()) != null) {
     36                 String []synonymsEnum = s.split("→");
     37                 map.put(synonymsEnum[0], synonymsEnum[1]);
     38             }
     39             br.close();
     40         } catch (IOException e) {
     41             logger.error("TextCosine IOException!");
     42         }
     43     }
     46     public TextCosine(double common, double special) {
     47         map = new HashMap<String, String>();
     48         try {
     49             Configuration conf = new Configuration();
     50             FileSystem fs = FileSystem.get(URI.create(PATH), conf);
     51             Path path = new Path("/user/hadoop/doc/synonyms.dict");
     52             FSDataInputStream is = fs.open(path);
     53             BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
     54             String s = null;
     55             while ((s = br.readLine()) != null) {
     56                 String []synonymsEnum = s.split("→");
     57                 map.put(synonymsEnum[0], synonymsEnum[1]);
     58             }
     59             br.close();
     60         } catch (IOException e) {
     61             logger.error("TextCosine IOException!");
     62         }
     64         this.common = common;
     65         this.special = special;
     66     }
     69     public void setCommon(double common) {
     70         this.common = common;
     71     }
     74     public void setSpecial(double special) {
     75         this.special = special;
     76     }
     78     // get the word with IK Analyzer
     79     public List<ElementDict> tokenizer(String str) {
     80         List<ElementDict> list = new ArrayList<ElementDict>();
     81         IKAnalyzer analyzer = new IKAnalyzer(true);
     82         try {
     83             TokenStream stream = analyzer.tokenStream("", str);
     84             CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
     85             stream.reset();
     86             int index = -1;
     87             while (stream.incrementToken()) {
     88                 if ((index = isContain(cta.toString(), list)) >= 0) {
     89                     list.get(index).setFreq(list.get(index).getFreq() + 1);
     90                 }
     91                 else {
     92                     list.add(new ElementDict(cta.toString(), 1));
     93                 }
     94             }
     95             analyzer.close();
     96         } catch (IOException e) {
     97             e.printStackTrace();
     98         } 
     99         return list;
    100     }
    102     // assert one term is in the List
    103     public int isContain(String str, List<ElementDict> list) {
    104         for (ElementDict ed : list) {
    105             if (ed.getTerm().equals(str)) {
    106                 return list.indexOf(ed);
    107             } else if (map.get(ed.getTerm())!= null && map.get(ed.getTerm()).equals(str)) {
    108                 return list.indexOf(ed);
    109             }
    110         }
    111         return -1;
    112     }
    114     // merge the two List to align the vector
    115     public List<String> mergeTerms(List<ElementDict> list1, List<ElementDict> list2) {
    116         List<String> list = new ArrayList<String>();
    117         for (ElementDict ed : list1) {
    118             if (!list.contains(ed.getTerm())) {
    119                 list.add(ed.getTerm());
    120             } else if (!list.contains(map.get(ed.getTerm()))) {
    121                 list.add(ed.getTerm());
    122             }
    123         }
    125         for (ElementDict ed : list2) {
    126             if (!list.contains(ed.getTerm())) {
    127                 list.add(ed.getTerm());
    128             } else if (!list.contains(map.get(ed.getTerm()))) {
    129                 list.add(ed.getTerm());
    130             }
    131         }
    132         return list;
    133     }
    135     // get the max cosine 
    136     public double analysisText(List<ElementDict> list1, List<ElementDict> list2) {
    137         int len1 = list1.size();
    138         int len2 = list2.size();
    139         double ret = 0;
    140         if (len2 >= len1 * 1.5) {
    141             List<ElementDict> newList = new ArrayList<ElementDict>();
    142             for (int i = 0; i + len1 <= len2; i++) {
    143                 for (int j = 0; j < len1; j++) 
    144                     newList.add(list2.get(i+j));
    146                 newList = adjustList(newList, list2, len2, len1, i);
    147                 double tmp = analysis(list1, newList);
    148                 if (tmp > ret) 
    149                     ret = tmp;
    150             }
    151         } else if (len1 >= len2 * 1.5) {
    152             List<ElementDict> newList = new ArrayList<ElementDict>();
    153             for (int i = 0; i + len2 <= len1; i++) {
    154                 for (int j = 0; j < len2; j++)
    155                     newList.add(list1.get(i+j));
    157                 newList = adjustList(newList, list1, len1, len2, i);
    158                 double tmp = analysis(list1, newList);
    159                 if (tmp > ret) 
    160                     ret = tmp;
    161             }
    162         } else {
    163             ret = analysis(list1, list2);
    164         }
    165         return ret;
    166     }
    168     // adjust the new List with the length about the original List 
    169     public List<ElementDict> adjustList(List<ElementDict> newList, List<ElementDict> list, int lenBig, int lenSmall, int index) {
    170         int gap = lenBig -lenSmall;
    171         int size = (gap/2 > 2) ? 2: gap/2;
    172         if (index < gap/2) {
    173             for (int i = 0; i < size; i++) {
    174                 newList.add(list.get(lenSmall+index+i));
    175             }
    176         } else {
    177             for (int i = 0; i > size; i++) {
    178                 newList.add(list.get(lenBig-index-i));
    179             }
    180         }
    181         return newList;
    182     }
    184     // analysis the cosine for two vectors
    185     public double analysis(List<ElementDict> list1, List<ElementDict> list2) {
    186         List<String> list = mergeTerms(list1, list2);
    187         List<Integer> weightList1 = assignWeight(list, list1);
    188         List<Integer> weightList2 = assignWeight(list, list2);
    189         return countCosSimilarity(weightList1, weightList2);
    190     }
    192     // according the frequency to assign the weight
    193     public List<Integer> assignWeight(List<String> list, List<ElementDict> list1) {
    194         List<Integer> vecList = new ArrayList<Integer>(list.size());
    195         boolean isEqual = false;
    196         for (String str : list) {
    197             for (ElementDict ed : list1) {
    198                 if (ed.getTerm().equals(str)) {
    199                     isEqual = true;
    200                     vecList.add(new Integer(ed.getFreq()));
    201                 } else if (map.get(ed.getTerm())!= null && map.get(ed.getTerm()).equals(str)) {
    202                     isEqual = true;
    203                     vecList.add(new Integer(ed.getFreq()));
    204                 }
    205             }
    207             if (!isEqual) {
    208                 vecList.add(new Integer(0));
    209             }
    210             isEqual = false;
    211         }
    212         return vecList;
    213     }
    215     // count the cosine about the two vectors
    216     public double countCosSimilarity(List<Integer> list1, List<Integer> list2) {
    217         double countScores = 0;
    218         int element = 0;
    219         int denominator1 = 0;
    220         int denominator2 = 0;
    221         int index = -1;
    222         for (Integer it : list1) {
    223             index ++;
    224             int left = it.intValue();
    225             int right = list2.get(index).intValue();
    226             element += left * right;
    227             denominator1 += left * left;
    228             denominator2 += right * right;
    229         }
    230         try {
    231             countScores = (double)element / Math.sqrt(denominator1 * denominator2);
    232         } catch (ArithmeticException e) {
    233             e.printStackTrace();
    234         }
    235         return countScores;
    236     }
    239     public boolean isSimilarity(double param, double score) {
    240         boolean ret = false;
    241         if (score >= param)
    242             ret = true;
    243         return ret;
    244     }
    247     public boolean assertSimilarity(List<ElementDict> list1, List<ElementDict> list2) 
    248     {
    249         int len1 = list1.size();
    250         int len2 = list2.size();
    251         if (len2 >= len1 * 1.5) {
    252             List<ElementDict> newList = new ArrayList<ElementDict>();
    253             for (int i = 0; i + len1 <= len2; i++) {
    254                 for (int j = 0; j < len1; j++) 
    255                     newList.add(list2.get(i+j));
    257                 newList = adjustList(newList, list2, len2, len1, i);
    258                 if (isSimilarity(special, analysis(list1, newList))) 
    259                     return true;
    260             }
    261         } else if (len1 >= len2 * 1.5) {
    262             List<ElementDict> newList = new ArrayList<ElementDict>();
    263             for (int i = 0; i + len2 <= len1; i++) {
    264                 for (int j = 0; j < len2; j++)
    265                     newList.add(list1.get(i+j));
    267                 newList = adjustList(newList, list1, len1, len2, i);
    268                 if (isSimilarity(special, analysis(list1, newList))) 
    269                     return true;
    270             }
    271         } else {
    272             if (isSimilarity(common, analysis(list1, list2))) 
    273                 return true;
    274         }
    275         return false;
    276     }
    277 }
    Class TextCosine
     1 import java.util.Collections;
     2 import java.util.List;
     4 import org.apache.logging.log4j.LogManager;
     5 import org.apache.logging.log4j.Logger;
     7 import com.gta.cosine.TextCosine;
     8 import com.gta.cosine.ElementDict;
    10 public class DensityCenter {
    11     private Logger logger = LogManager.getLogger(DensityCenter.class);
    12     private double eps;
    13     private TextCosine cosine;
    15     public DensityCenter(double eps, TextCosine cosine) {
    16         this.eps = eps;
    17         this.cosine = cosine;
    18     }
    21     public double cosineDistance(String src, String dst) 
    22     {
    23         List<ElementDict> vec1 = cosine.tokenizer(src);
    24         List<ElementDict> vec2 = cosine.tokenizer(dst);
    25         return cosine.analysisText(vec1, vec2);
    26     }
    29     public int getNeighbors(String src, List<String> dst) {
    30         int ret = 0;
    31         double score = 0;
    32         for (String s : dst) {
    33             score = cosineDistance(src, s);
    34             if (score >= eps)
    35                 ret++;
    36         }
    37         return ret;
    38     }
    41     public String getDensityCenter(List<String> text) {
    42         int max = 0;
    43         int i = 0;
    44         int index = 0;
    45         for (String s : text) {
    46             int ret = getNeighbors(s, text);
    47             if (ret > max) {
    48                 index = i;
    49                 max = ret;
    50             }
    51             i++;
    52         }
    53         return text.get(index);
    54     }
    57     public boolean compareCenters(List<String> oldCenters, List<String> newCenters) 
    58     {
    59         boolean ret = false;
    60         Collections.sort(oldCenters);
    61         Collections.sort(newCenters);
    62         int oldSize = oldCenters.size();
    63         int newSize = newCenters.size();
    64         logger.info("oldSize : " + oldSize);
    65         logger.info("newSize : " + newSize);
    66         int size = oldSize > newSize ? newSize : oldSize;
    67         int index = 0;
    68         int count = 0;
    69         for (String s : oldCenters) {
    70             if (s.equals(newCenters.get(index))) 
    71                 count++;
    73             index++;
    74             if (index >= size)  // Avoid the size of two List is not the same
    75                 break;
    76         }
    77         logger.info("count : " + count);
    78         if (count == index)
    79             ret = true;
    81         return ret;
    82     }
    83 }
    Class DensityCenter
      1 import java.io.BufferedReader;
      2 import java.io.InputStreamReader;
      3 import java.io.IOException;
      4 import java.net.URI;
      5 import java.util.ArrayList;
      6 import java.util.List;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.FSDataInputStream;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.IntWritable;
     11 import org.apache.hadoop.io.NullWritable;
     12 import org.apache.hadoop.io.Text;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.logging.log4j.LogManager;
     16 import org.apache.logging.log4j.Logger;
     17 import com.gta.cosine.TextCosine;
     18 import com.gta.cosine.ElementDict;
     19 import com.gta.util.DensityCenter;
     21 public class KMeansProcess {
     23     public static class TextMapper extends Mapper<Object, Text, IntWritable, Text> {
     24         private static Logger logger = LogManager.getLogger(TextMapper.class);
     25         public static List<String> centersList = new ArrayList<String>();
     26         public static TextCosine cosine = new TextCosine();
     28         public void setup(Context context)
     29         {
     30             int iteration = context.getConfiguration().getInt("ITERATION", 100);
     31             if (iteration == 0) {
     32                 int task = context.getConfiguration().getInt("TASK", 0);
     33                 try {
     34                     URI[] caches = context.getCacheFiles();
     35                     if (caches == null || caches.length <= 0) {
     36                         System.exit(1);
     37                     }
     38                     for (int i = 0; i < task; i++) {
     39                         FileSystem fs = FileSystem.get(caches[i], context.getConfiguration());
     40                         FSDataInputStream is = fs.open(new Path(caches[i].toString()));
     41                         BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
     42                         String s = null;
     43                         while ((s = br.readLine()) != null) {
     44                             centersList.add(s);    
     45                         }
     46                         br.close();
     47                     }
     48                 } catch (IOException e) {
     49                     logger.error(e.getMessage());
     50                 }
     51             }
     52         }
     55         public void map(Object key, Text value, Context context)  
     56         {
     57             try {
     58                 String str = value.toString();
     59                 double score = 0;
     60                 double countTmp = 0;
     61                 int clusterID = 0;
     62                 int index = 0;
     63                 List<ElementDict> vec1 = cosine.tokenizer(str);
     64                 for (String s : centersList) {
     65                     List<ElementDict> vec2 = cosine.tokenizer(s);
     66                     countTmp = cosine.analysisText(vec1, vec2);
     67                     if (countTmp > score) {
     68                         clusterID = index;
     69                         score = countTmp;
     70                     }
     71                     index++;
     72                 }
     73                 context.write(new IntWritable(clusterID), new Text(str));
     74             } catch (IOException e) {
     75                 logger.error(e.getMessage());
     76             } catch (InterruptedException e) {
     77                 logger.error(e.getMessage());
     78             }
     79         }
     80     }
     83     public static class TextReducer extends Reducer<IntWritable, Text, NullWritable, Text> {
     84         private static Logger logger = LogManager.getLogger(TextReducer.class);
     85         public static DensityCenter center = new DensityCenter(0.75, KMeansProcess.TextMapper.cosine);
     87         public void reduce(IntWritable key, Iterable<Text> values, Context context) {
     88             try {
     89                 List<String> list = new ArrayList<String>();
     90                 for (Text val : values) {
     91                     list.add(val.toString());
     92                 }
     93                 context.write(NullWritable.get(), new Text(center.getDensityCenter(list)));
     94             } catch (IOException e) {
     95                 logger.error(e.getMessage());
     96             } catch (InterruptedException e) {
     97                 logger.error(e.getMessage());
     98             }
     99         }
    100     }
    101 }
    Class KMeansProcess
     1 import java.io.BufferedReader;
     2 import java.io.IOException;
     3 import java.io.InputStreamReader;
     4 import java.net.URI;
     5 import java.util.List;
     6 import org.apache.hadoop.fs.FSDataInputStream;
     7 import org.apache.hadoop.fs.FileSystem;
     8 import org.apache.hadoop.fs.Path;
     9 import org.apache.hadoop.io.IntWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.logging.log4j.LogManager;
    14 import org.apache.logging.log4j.Logger;
    15 import com.gta.cosine.TextCosine;
    16 import com.gta.cosine.ElementDict;
    18 public class KMeans {
    20     public static class KMeansMapper extends Mapper<Object, Text, IntWritable, Text> {
    21         private List<String> centersList = KMeansProcess.TextMapper.centersList;
    22         private static Logger logger = LogManager.getLogger(KMeans.KMeansMapper.class);
    23         private TextCosine cosine = KMeansProcess.TextMapper.cosine;
    25         public void setup(Context context)
    26         {
    27             int task = context.getConfiguration().getInt("TASK", 0);
    28             try {
    29                 URI[] caches = context.getCacheFiles();
    30                 if (caches == null || caches.length <= 0) {
    31                     System.exit(1);
    32                 }
    33                 for (int i = 0; i < task; i++) {
    34                     FileSystem fs = FileSystem.get(caches[i], context.getConfiguration());
    35                     FSDataInputStream is = fs.open(new Path(caches[i].toString()));
    36                     BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    37                     String s = null;
    38                     while ((s = br.readLine()) != null)
    39                         centersList.add(s);
    40                     br.close();
    41                 }
    42             } catch (IOException e) {
    43                 logger.error(e.getMessage());
    44             }
    45         }
    48         public void map(Object key, Text value, Context context) {
    49             try {
    50                 String str = value.toString();
    51                 double score = 0;
    52                 double countTmp = 0;
    53                 int clusterID = 0;
    54                 int index = 0;
    55                 List<ElementDict> vec1 = cosine.tokenizer(str);
    56                 for (String s : centersList) {
    57                     List<ElementDict> vec2 = cosine.tokenizer(s);
    58                     countTmp = cosine.analysisText(vec1, vec2);
    59                     if (countTmp > score) {
    60                         clusterID = index;
    61                         score = countTmp;
    62                     }
    63                     index++;
    64                 }
    65                 context.write(new IntWritable(clusterID), new Text(str));
    66             } catch (IOException e) {
    67                 logger.error(e.getMessage());
    68             } catch (InterruptedException e) {
    69                 logger.error(e.getMessage());
    70             }
    71         }
    74         public void cleanup(Context context)
    75         {
    76             centersList.clear();
    77         }
    78     }
    81     public static class KMeansReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    82         private static Logger logger = LogManager.getLogger(KMeans.KMeansReducer.class);
    84         public void ruduce(IntWritable key, Iterable<Text> values, Context context) {
    85             try {
    86                 for (Text val : values) {
    87                     context.write(key, val);
    88                 }
    89             } catch (IOException e) {
    90                 logger.error(e.getMessage());
    91             } catch (InterruptedException e) {
    92                 logger.error(e.getMessage());
    93             }
    94         }
    95     }
    97 }
    Class KMeans
      1 import java.io.BufferedReader;
      2 import java.io.IOException;
      3 import java.io.InputStreamReader;
      4 import java.util.List;
      5 import java.util.ArrayList;
      6 import java.net.URI;
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FSDataInputStream;
      9 import org.apache.hadoop.fs.FileSystem;
     10 import org.apache.hadoop.fs.Path;
     11 import org.apache.hadoop.io.IntWritable;
     12 import org.apache.hadoop.io.Text;
     13 import org.apache.hadoop.mapreduce.Job;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
     16 import com.gta.cluster.KMeans.KMeansMapper;
     17 import com.gta.cluster.KMeans.KMeansReducer;
     18 import com.gta.cluster.KMeansProcess.TextMapper;
     19 import com.gta.cluster.KMeansProcess.TextReducer;
     20 import org.apache.logging.log4j.LogManager;
     21 import org.apache.logging.log4j.Logger;
     23 public class Cluster {
     24     public static final int MAX = 50;
     25     public static final String INPUT_PATH = "hdfs://";
     26     public static final String OUTPUT_PATH = "hdfs://";
     27     public static final String TMP_PATH = "hdfs://";
     28     public static final int TASK = 5;
     29     public static Logger logger = LogManager.getLogger(Cluster.class);
     30     private Configuration conf;
     31     private int iteration = 0;
     33     public Cluster() 
     34     {
     35         this.conf = new Configuration();
     36         conf.setInt("TASK", TASK);
     37     }
     40     public void run() throws IOException, InterruptedException, ClassNotFoundException
     41     {
     42         while (iteration < MAX) {
     43             logger.info("次数   :  " + (iteration+1));
     44             conf.setInt("ITERATION", iteration);
     45             Job job = Job.getInstance(conf, "KMeans Process");
     46             if (iteration == 0) {
     47                 String cacheFile = TMP_PATH + iteration + "/part-r-0000";
     48                 for (int i = 0; i < TASK; i++)
     49                     job.addCacheFile(URI.create(cacheFile+i));
     50             }
     51             job.setJarByClass(KMeansProcess.class);
     52             job.setMapperClass(TextMapper.class);
     53             job.setNumReduceTasks(TASK);
     54             job.setReducerClass(TextReducer.class);
     55             job.setOutputKeyClass(IntWritable.class);
     56             job.setOutputValueClass(Text.class);
     57             iteration++;
     58             String outFile = TMP_PATH + iteration;
     59             FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
     60             FileOutputFormat.setOutputPath(job, new Path(outFile));
     61             job.waitForCompletion(true);
     62             conf.unset("ITERATION");
     63             List<String> tmpList = getCenterList(outFile);
     64             if (KMeansProcess.TextReducer.center.compareCenters(KMeansProcess.TextMapper.centersList, tmpList)) 
     65                 break;
     66             else {
     67                 KMeansProcess.TextMapper.centersList.clear();
     68                 for (String s : tmpList) {
     69                     KMeansProcess.TextMapper.centersList.add(s);
     70                 }
     71             }
     72         }
     73     }
     76     public void lastRun() throws IOException, InterruptedException, ClassNotFoundException
     77     {
     78         String cacheFile = TMP_PATH + iteration + "/part-r-0000";
     79         Job job = Job.getInstance(conf, "KMeans");
     80         for (int i = 0; i < TASK; i++)
     81             job.addCacheFile(URI.create(cacheFile+i));
     82         job.setJarByClass(KMeans.class);
     83         job.setMapperClass(KMeansMapper.class);
     84         job.setReducerClass(KMeansReducer.class);
     85         job.setOutputKeyClass(IntWritable.class);
     86         job.setOutputValueClass(Text.class);    
     87         FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
     88         FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
     89         job.waitForCompletion(true);
     90     }
     93     public List<String> getCenterList(String outFile) 
     94     {
     95         List<String> centerList = new ArrayList<String>();
     96         String fileName = outFile + "/part-r-0000";
     97         try {
     98             for (int i = 0; i < TASK; i++) {
     99                 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
    100                 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
    101                 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    102                 String s = null;
    103                 while ((s = br.readLine()) != null)
    104                     centerList.add(s);
    105                 br.close();
    106             }
    107         } catch (IOException e) {
    108             logger.info(e.getMessage());
    109         }
    111         return centerList;
    112     }
    115     public static void main(String[] args) {
    116         Cluster cluster = new Cluster();     
    117         try {
    118             long start = System.currentTimeMillis();
    119             cluster.run();
    120             cluster.lastRun();
    121             long end = System.currentTimeMillis();
    122             Cluster.logger.info(end-start);
    123         } catch (ClassNotFoundException e) {
    124             e.printStackTrace();
    125         } catch (IOException e) {
    126             e.printStackTrace();
    127         } catch (InterruptedException e) {
    128             e.printStackTrace();
    129         }                
    130     }
    131 }
    Class Cluster






  • 相关阅读:
    Matlab---读取 .txt文件
    day 28 黏包及黏包解决方案
    day 27
    day 26 网络知识 01
    day 25 模块与包
  • 原文地址:https://www.cnblogs.com/lyssym/p/4954023.html
Copyright © 2011-2022 走看看