思路
- 研究特征
- 所有的微博变成词袋
- 一条微博形成一条向量,数量代表出现的次数
- 使用TF-IDF计算词的重要性
- 选取主要的重要的词进行KMeans聚类
- 筛选出的N个次就能很好的代表当前类的主旨思想,可以给与到网络营销部分
代码
package com.test import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ListBuffer import org.apache.lucene.analysis.TokenStream import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.feature.IDFModel import org.apache.spark.rdd.RDD import org.wltea.analyzer.lucene.IKAnalyzer import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.clustering.KMeansModel object Test02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("BlogMeans").setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.textFile("data/original.txt") var wordRDD: RDD[(String, ArrayBuffer[String])] = rdd.mapPartitions(iterator => { val list = new ListBuffer[(String, ArrayBuffer[String])] while (iterator.hasNext) { //创建分词对象 IKAnalyzer支持两种分词模式:最细粒度和智能分词模式,如果构造函数参数为false,那么使用最细粒度分词。 val analyzer = new IKAnalyzer(true) val line = iterator.next() val textArr = line.split(" ") val id = textArr(0) val text = textArr(1) //分词 第一个参数只是标识性,没有实际作用,第二个读取的数据 val ts: TokenStream = analyzer.tokenStream("", text) // 得到相应词汇的内容 val term: CharTermAttribute = ts.getAttribute(classOf[CharTermAttribute]) //重置分词器,使得tokenstream可以重新返回各个分词 ts.reset() val arr = new ArrayBuffer[String]() //遍历分词数据 while (ts.incrementToken()) { arr.+=(term.toString) } list.append((id, arr)) analyzer.close() } list.iterator }) /** * action算子之后,才会将wordRDD 写入到内存中 * 相当于 persist() */ wordRDD = wordRDD.cache() /** * 1000:生成长度为1000的向量 * 带来新的问题: hash碰撞的问题,(效率和碰撞问题取中间) * * 为了提高训练模型的效率,向量长度(特征个数)设置为1000个 * * 向量不同的位置的值与每篇微博的单词并不是一一对应的 */ val hashingTF: HashingTF = new HashingTF(1000) /** * hashingTF.transform(x._2) * 按照hashingTF规则 计算分词频数(TF) */ val tfRDD: RDD[(String, Vector)] = wordRDD.map(x => { (x._1, hashingTF.transform(x._2)) }) /** * * * tfRDD * * K:微博ID * * V:Vector(tf,tf,tf.....) */ val idf: IDFModel = new IDF().fit(tfRDD.map(_._2)) /** * K:微博 ID * V:每一个单词的TF-IDF值 * tfIdfs这个RDD中的Vector就是训练模型的训练集 * dfIdfs: 计算TFIDF值 */ val tfIdfs: RDD[(String, Vector)] = tfRDD.mapValues(idf.transform(_)) //设置聚类个数 val kCluster = 20 val kMeans = new KMeans() kMeans.setK(kCluster) //使用的是kemans++算法来训练模型 "random" "k-means||" 代表 kmeans kmeans++ kMeans.setInitializationMode("k-means||") // 设置最大迭代次数,本次中心的坐标 kMeans.setMaxIterations(1000) //训练模型 聚类的模型(k个中心点的坐标) 线性回归(w0,w1) val kmeansModel: KMeansModel = kMeans.run(tfIdfs.map(_._2)) // 打印模型的20个中心点 println(kmeansModel.clusterCenters) /** * 模型预测 */ val modelBroadcast = sc.broadcast(kmeansModel) /** * predicetionRDD KV格式的RDD * K:微博ID V:分类号 */ val predicetionRDD: RDD[(String, Int)] = tfIdfs.mapValues(vetor => { val model = modelBroadcast.value model.predict(vetor) }) /** * 总结预测结果 * tfIdfs2wordsRDD:kv格式的RDD * K:微博ID * V:二元组(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....)) */ val tfIdfs2wordsRDD: RDD[(String, (Vector, ArrayBuffer[String]))] = tfIdfs.join(wordRDD) /** * result:KV * K:微博ID * V:(类别号,(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....))) */ val result: RDD[(String, (Int, (Vector, ArrayBuffer[String])))] = predicetionRDD.join(tfIdfs2wordsRDD) result.filter(x => x._2._1 == 2).flatMap(line => { val tfIdfV: Vector = line._2._2._1 val words: ArrayBuffer[String] = line._2._2._2 val list = new ListBuffer[(Double, String)] for (i <- 0 until words.length) { list.append((tfIdfV(hashingTF.indexOf(words(i))), words(i))) } list }).sortBy(x => x._1, false) .map(_._2) .distinct() .take(30) .foreach(println) sc.stop() } }
pom
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>2.3.4</version> </dependency> <dependency> <groupId>com.janeluo</groupId> <artifactId>ikanalyzer</artifactId> <version>2012_u6</version> </dependency>