  • Spark高级数据分析· 6LSA


    wget http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream.xml.bz2

    1 获取数据

    def readFile(path: String, sc: SparkContext): RDD[String] = {
      val conf = new Configuration()
      conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
      conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
      val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable],
        classOf[Text], conf)
      rawXmls.map(p => p._2.toString)
    //Returns a (title, content) pair
    def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = {
      val page = new EnglishWikipediaPage()
      WikipediaPage.readPage(page, pageXml)
      if (page.isEmpty || !page.isArticle || page.isRedirect ||
          page.getTitle.contains("(disambiguation)")) {
      } else {
        Some((page.getTitle, page.getContent))
    val pages = readFile("hdfs:///user/ds/Wikipedia/", sc)
       .sample(false, sampleSize, 11L)
    val plainText = pages.filter(_ != null).flatMap(wikiXmlToPlainText)

    2 词形归并

    def createNLPPipeline(): StanfordCoreNLP = {
      val props = new Properties()
      props.put("annotators", "tokenize, ssplit, pos, lemma")
      new StanfordCoreNLP(props)
    def isOnlyLetters(str: String): Boolean = {
      // While loop for high performance
      var i = 0
      while (i < str.length) {
        if (!Character.isLetter(str.charAt(i))) {
          return false
        i += 1
    def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP)
      : Seq[String] = {
      val doc = new Annotation(text)
      val lemmas = new ArrayBuffer[String]()
      val sentences = doc.get(classOf[SentencesAnnotation])
      for (sentence <- sentences.asScala;
           token <- sentence.get(classOf[TokensAnnotation]).asScala) {
        val lemma = token.get(classOf[LemmaAnnotation])
        if (lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)) {
          lemmas += lemma.toLowerCase
    val stopWords = sc.broadcast(loadStopWords("stopwords.txt")).value
    val lemmatized = plainText.mapPartitions(iter => {
      val pipeline = createNLPPipeline()
      iter.map{ case(title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline))}

    3 TF-IDF

    def documentTermMatrix(docs: RDD[(String, Seq[String])], stopWords: Set[String], numTerms: Int,
         sc: SparkContext): (RDD[Vector], Map[Int, String], Map[Long, String], Map[String, Double]) = {
       val docTermFreqs = docs.mapValues(terms => {
         val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) {
           (map, term) => map += term -> (map.getOrElse(term, 0) + 1)
       val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap()
       val docFreqs = documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms)
       println("Number of terms: " + docFreqs.size)
       saveDocFreqs("docfreqs.tsv", docFreqs)
       val numDocs = docIds.size
       val idfs = inverseDocumentFrequencies(docFreqs, numDocs)
       // Maps terms to their indices in the vector
       val idTerms = idfs.keys.zipWithIndex.toMap
       val termIds = idTerms.map(_.swap)
       val bIdfs = sc.broadcast(idfs).value
       val bIdTerms = sc.broadcast(idTerms).value
       val vecs = docTermFreqs.map(_._2).map(termFreqs => {
         val docTotalTerms = termFreqs.values.sum
         val termScores = termFreqs.filter {
           case (term, freq) => bIdTerms.contains(term)
           case (term, freq) => (bIdTerms(term), bIdfs(term) * termFreqs(term) / docTotalTerms)
         Vectors.sparse(bIdTerms.size, termScores)
       (vecs, termIds, docIds, idfs)
     def documentFrequencies(docTermFreqs: RDD[HashMap[String, Int]]): HashMap[String, Int] = {
       val zero = new HashMap[String, Int]()
       def merge(dfs: HashMap[String, Int], tfs: HashMap[String, Int])
         : HashMap[String, Int] = {
         tfs.keySet.foreach { term =>
           dfs += term -> (dfs.getOrElse(term, 0) + 1)
       def comb(dfs1: HashMap[String, Int], dfs2: HashMap[String, Int])
         : HashMap[String, Int] = {
         for ((term, count) <- dfs2) {
           dfs1 += term -> (dfs1.getOrElse(term, 0) + count)
       docTermFreqs.aggregate(zero)(merge, comb)
     def documentFrequenciesDistributed(docTermFreqs: RDD[HashMap[String, Int]], numTerms: Int)
         : Array[(String, Int)] = {
       val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)
       val ordering = Ordering.by[(String, Int), Int](_._2)
     def trimLeastFrequent(freqs: Map[String, Int], numToKeep: Int): Map[String, Int] = {
       freqs.toArray.sortBy(_._2).take(math.min(numToKeep, freqs.size)).toMap
     def inverseDocumentFrequencies(docFreqs: Array[(String, Int)], numDocs: Int)
       : Map[String, Double] = {
       docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap

    4 奇异值分解

    val mat = new RowMatrix(termDocMatrix)
    val svd = mat.computeSVD(k, computeU=true)
    def topTermsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
        numTerms: Int, termIds: Map[Int, String]): Seq[Seq[(String, Double)]] = {
      val v = svd.V
      val topTerms = new ArrayBuffer[Seq[(String, Double)]]()
      val arr = v.toArray
      for (i <- 0 until numConcepts) {
        val offs = i * v.numRows
        val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex
        val sorted = termWeights.sortBy(-_._1)
        topTerms += sorted.take(numTerms).map{case (score, id) => (termIds(id), score)}
    def topDocsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
        numDocs: Int, docIds: Map[Long, String]): Seq[Seq[(String, Double)]] = {
      val u  = svd.U
      val topDocs = new ArrayBuffer[Seq[(String, Double)]]()
      for (i <- 0 until numConcepts) {
        val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId
        topDocs += docWeights.top(numDocs).map{case (score, id) => (docIds(id), score)}
    val topConceptTerms = topTermsInTopConcepts(svd, 10, 10, termIds)
    val topConceptDocs = topDocsInTopConcepts(svd, 10, 10, docIds)
    for ((terms, docs) <- topConceptTerms.zip(topConceptDocs)) {
      println("Concept terms: " + terms.map(_._1).mkString(", "))
      println("Concept docs: " + docs.map(_._1).mkString(", "))

    5 相关度

    import breeze.linalg.{DenseMatrix => BDenseMatrix, DenseVector => BDenseVector,
    SparseVector => BSparseVector}
    def topTermsForTerm(normalizedVS: BDenseMatrix[Double], termId: Int): Seq[(Double, Int)] = {
      // Look up the row in VS corresponding to the given term ID.
      val termRowVec = new BDenseVector[Double](row(normalizedVS, termId).toArray)
      // Compute scores against every term
      val termScores = (normalizedVS * termRowVec).toArray.zipWithIndex
      // Find the terms with the highest scores
    def topDocsForDoc(normalizedUS: RowMatrix, docId: Long): Seq[(Double, Long)] = {
      // Look up the row in US corresponding to the given doc ID.
      val docRowArr = row(normalizedUS, docId)
      val docRowVec = Matrices.dense(docRowArr.length, 1, docRowArr)
      // Compute scores against every doc
      val docScores = normalizedUS.multiply(docRowVec)
      // Find the docs with the highest scores
      val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
      // Docs can end up with NaN score if their row in U is all zeros.  Filter these out.
    def topDocsForTerm(US: RowMatrix, V: Matrix, termId: Int): Seq[(Double, Long)] = {
      val termRowArr = row(V, termId).toArray
      val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr)
      // Compute scores against every doc
      val docScores = US.multiply(termRowVec)
      // Find the docs with the highest scores
      val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId


    def termsToQueryVector(terms: Seq[String], idTerms: Map[String, Int], idfs: Map[String, Double])
      : BSparseVector[Double] = {
      val indices = terms.map(idTerms(_)).toArray
      val values = terms.map(idfs(_)).toArray
      new BSparseVector[Double](indices, values, idTerms.size)
    def topDocsForTermQuery(US: RowMatrix, V: Matrix, query: BSparseVector[Double])
      : Seq[(Double, Long)] = {
      val breezeV = new BDenseMatrix[Double](V.numRows, V.numCols, V.toArray)
      val termRowArr = (breezeV.t * query).toArray
      val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr)
      // Compute scores against every doc
      val docScores = US.multiply(termRowVec)
      // Find the docs with the highest scores
      val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
