wget http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream.xml.bz2
1 获取数据
def readFile(path: String, sc: SparkContext): RDD[String] = {
val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable],
classOf[Text], conf)
rawXmls.map(p => p._2.toString)
//Returns a (title, content) pair
def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = {
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, pageXml)
if (page.isEmpty || !page.isArticle || page.isRedirect ||
page.getTitle.contains("(disambiguation)")) {
} else {
Some((page.getTitle, page.getContent))
val pages = readFile("hdfs:///user/ds/Wikipedia/", sc)
.sample(false, sampleSize, 11L)
val plainText = pages.filter(_ != null).flatMap(wikiXmlToPlainText)
2 词形归并
def createNLPPipeline(): StanfordCoreNLP = {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
new StanfordCoreNLP(props)
def isOnlyLetters(str: String): Boolean = {
def documentTermMatrix(docs: RDD[(String, Seq[String])], stopWords: Set[String], numTerms: Int,
sc: SparkContext): (RDD[Vector], Map[Int, String], Map[Long, String], Map[String, Double]) = {
val docTermFreqs = docs.mapValues(terms => {
val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) {
(map, term) => map += term -> (map.getOrElse(term, 0) + 1)
val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap()
val docFreqs = documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms)
println("Number of terms: " + docFreqs.size)
saveDocFreqs("docfreqs.tsv", docFreqs)
val numDocs = docIds.size
val idfs = inverseDocumentFrequencies(docFreqs, numDocs)
// Maps terms to their indices in the vector
val idTerms = idfs.keys.zipWithIndex.toMap
val termIds = idTerms.map(_.swap)
val bIdfs = sc.broadcast(idfs).value
val bIdTerms = sc.broadcast(idTerms).value
val vecs = docTermFreqs.map(_._2).map(termFreqs => {
val docTotalTerms = termFreqs.values.sum
val termScores = termFreqs.filter {
case (term, freq) => bIdTerms.contains(term)
case (term, freq) => (bIdTerms(term), bIdfs(term) * termFreqs(term) / docTotalTerms)
Vectors.sparse(bIdTerms.size, termScores)
(vecs, termIds, docIds, idfs)
def documentFrequencies(docTermFreqs: RDD[HashMap[String, Int]]): HashMap[String, Int] = {
val zero = new HashMap[String, Int]()
def merge(dfs: HashMap[String, Int], tfs: HashMap[String, Int])
: HashMap[String, Int] = {
tfs.keySet.foreach { term =>
dfs += term -> (dfs.getOrElse(term, 0) + 1)
def comb(dfs1: HashMap[String, Int], dfs2: HashMap[String, Int])
: HashMap[String, Int] = {
for ((term, count) <- dfs2) {
dfs1 += term -> (dfs1.getOrElse(term, 0) + count)
docTermFreqs.aggregate(zero)(merge, comb)
def documentFrequenciesDistributed(docTermFreqs: RDD[HashMap[String, Int]], numTerms: Int)
: Array[(String, Int)] = {
val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)
val ordering = Ordering.by[(String, Int), Int](_._2)
def trimLeastFrequent(freqs: Map[String, Int], numToKeep: Int): Map[String, Int] = {
freqs.toArray.sortBy(_._2).take(math.min(numToKeep, freqs.size)).toMap
def inverseDocumentFrequencies(docFreqs: Array[(String, Int)], numDocs: Int)
: Map[String, Double] = {
docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap
4 奇异值分解
val mat = new RowMatrix(termDocMatrix)
val svd = mat.computeSVD(k, computeU=true)
def topTermsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
numTerms: Int, termIds: Map[Int, String]): Seq[Seq[(String, Double)]] = {
val v = svd.V
val topTerms = new ArrayBuffer[Seq[(String, Double)]]()
val arr = v.toArray
for (i <- 0 until numConcepts) {
val offs = i * v.numRows
val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex
val sorted = termWeights.sortBy(-_._1)
topTerms += sorted.take(numTerms).map{case (score, id) => (termIds(id), score)}
def topDocsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
numDocs: Int, docIds: Map[Long, String]): Seq[Seq[(String, Double)]] = {
val u = svd.U
val topDocs = new ArrayBuffer[Seq[(String, Double)]]()
for (i <- 0 until numConcepts) {
val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId
topDocs += docWeights.top(numDocs).map{case (score, id) => (docIds(id), score)}
val topConceptTerms = topTermsInTopConcepts(svd, 10, 10, termIds)
val topConceptDocs = topDocsInTopConcepts(svd, 10, 10, docIds)
for ((terms, docs) <- topConceptTerms.zip(topConceptDocs)) {
println("Concept terms: " + terms.map(_._1).mkString(", "))
println("Concept docs: " + docs.map(_._1).mkString(", "))
5 相关度
import breeze.linalg.{DenseMatrix => BDenseMatrix, DenseVector => BDenseVector,
SparseVector => BSparseVector}
def topTermsForTerm(normalizedVS: BDenseMatrix[Double], termId: Int): Seq[(Double, Int)] = {
def termsToQueryVector(terms: Seq[String], idTerms: Map[String, Int], idfs: Map[String, Double])
: BSparseVector[Double] = {
val indices = terms.map(idTerms(_)).toArray
val values = terms.map(idfs(_)).toArray
new BSparseVector[Double](indices, values, idTerms.size)
def topDocsForTermQuery(US: RowMatrix, V: Matrix, query: BSparseVector[Double])
: Seq[(Double, Long)] = {
val breezeV = new BDenseMatrix[Double](V.numRows, V.numCols, V.toArray)
val termRowArr = (breezeV.t * query).toArray
val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr)