zoukankan      html  css  js  c++  java
  • Spark机器学习8· 文本处理(spark-shell)


    Spark机器学习

    自然语言处理(NLP,Natural Language Processing)

    • 提取特征
    • 建模
    • 机器学习

    TF-IDF(词频 term frequency–逆向文件频率 inverse document frequency)

    • 短语加权:根据词频,为单词赋予权值
    • 特征哈希:使用哈希方程对特征赋予向量下标

    0 运行环境

    tar xfvz 20news-bydate.tar.gz
    
    export SPARK_HOME=/Users/erichan/Garden/spark-1.5.1-bin-hadoop2.6
    cd $SPARK_HOME
    bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2
    

    1 提取特征

    val PATH = "/Users/erichan/sourcecode/book/Spark机器学习/20news-bydate"
    val path = PATH+"/20news-bydate-train/*"
    val rdd = sc.wholeTextFiles(path)
    println(rdd.count)
    

    11314

    查看新闻组主题

    val newsgroups = rdd.map { case (file, text) => file.split("/").takeRight(2).head }
    val countByGroup = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString("
    ")
    println(countByGroup)
    

    (rec.sport.hockey,600)
    (soc.religion.christian,599)
    (rec.motorcycles,598)
    (rec.sport.baseball,597)
    (sci.crypt,595)
    (rec.autos,594)
    (sci.med,594)
    (comp.windows.x,593)
    (sci.space,593)
    (sci.electronics,591)
    (comp.os.ms-windows.misc,591)
    (comp.sys.ibm.pc.hardware,590)
    (misc.forsale,585)
    (comp.graphics,584)
    (comp.sys.mac.hardware,578)
    (talk.politics.mideast,564)
    (talk.politics.guns,546)
    (alt.atheism,480)
    (talk.politics.misc,465)
    (talk.religion.misc,377)

    2 建模

    2.1 分词

    val text = rdd.map { case (file, text) => text }
    val whiteSpaceSplit = text.flatMap(t => t.split(" ").map(_.toLowerCase))
    println(whiteSpaceSplit.distinct.count)
    println(whiteSpaceSplit.sample(true, 0.3, 42).take(100).mkString(","))
    

    402978
    from:,mathew,mathew,faq:,faq:,atheist,resources
    summary:,music,--,fiction,,mantis,consultants,,uk.
    supersedes:,290

    archive-name:,1.0

    ,,,,,,,,,,,,,,,,,,,organizations

    ,organizations

    ,,,,,,,,,,,,,,,,stickers,and,and,the,from,from,in,to:,to:,ffrf,,256-8900

    evolution,designs

    evolution,a,stick,cars,,written
    inside.,fish,us.

    write,evolution,,,,,,,bay,can,get,get,,to,the
    price,is,of,the,the,so,on.,and,foote.,,atheist,pp.,0-910309-26-4,,,atrocities,,foote:,aap.,,the

    2.2 改进分词

    val nonWordSplit = text.flatMap(t => t.split("""W+""").map(_.toLowerCase))
    println(nonWordSplit.distinct.count)
    println(nonWordSplit.distinct.sample(true, 0.3, 42).take(100).mkString(","))
    
    val regex = """[^0-9]*""".r
    val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)
    println(filterNumbers.distinct.count)
    println(filterNumbers.distinct.sample(true, 0.3, 42).take(100).mkString(","))
    

    2.3 移除停用词

    val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
    val oreringDesc = Ordering.by[(String, Int), Int](_._2)
    //println(tokenCounts.top(20)(oreringDesc).mkString("
    "))
    
    val stopwords = Set(
        "the","a","an","of","or","in","for","by","on","but", "is", "not", "with", "as", "was", "if",
        "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to"
    )
    val tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) => !stopwords.contains(k) }
    //println(tokenCountsFilteredStopwords.top(20)(oreringDesc).mkString("
    "))
    
    val tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >= 2 }
    println(tokenCountsFilteredSize.top(20)(oreringDesc).mkString("
    "))
    

    2.4 移除低频词

    val oreringAsc = Ordering.by[(String, Int), Int](-_._2)
    //println(tokenCountsFilteredSize.top(20)(oreringAsc).mkString("
    "))
    
    val rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet
    val tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) => !rareTokens.contains(k) }
    println(tokenCountsFilteredAll.top(20)(oreringAsc).mkString("
    "))
    
    def tokenize(line: String): Seq[String] = {
        line.split("""W+""")
            .map(_.toLowerCase)
            .filter(token => regex.pattern.matcher(token).matches)
            .filterNot(token => stopwords.contains(token))
            .filterNot(token => rareTokens.contains(token))
            .filter(token => token.size >= 2)
            .toSeq
    }
    
    //println(text.flatMap(doc => tokenize(doc)).distinct.count)
    val tokens = text.map(doc => tokenize(doc))
    println(tokens.first.take(20))
    

    2.5 提取词干

    • 标准NLP方法
    • 搜索引擎
      • NLTK
      • OpenNLP
      • Lucene

    3 训练模型

    3.1 HashingTF 特征哈希

    import org.apache.spark.mllib.linalg.{ SparseVector => SV }
    import org.apache.spark.mllib.feature.HashingTF
    import org.apache.spark.mllib.feature.IDF
    // set the dimensionality of TF-IDF vectors to 2^18
    val dim = math.pow(2, 18).toInt
    val hashingTF = new HashingTF(dim)
    val tf = hashingTF.transform(tokens)
    tf.cache
    
    val v = tf.first.asInstanceOf[SV]
    println(v.size)
    println(v.values.size)
    println(v.values.take(10).toSeq)
    println(v.indices.take(10).toSeq)
    

    262144
    706
    WrappedArray(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0)
    WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

    fit & transform

    val idf = new IDF().fit(tf)
    val tfidf = idf.transform(tf)
    val v2 = tfidf.first.asInstanceOf[SV]
    println(v2.values.size)
    println(v2.values.take(10).toSeq)
    println(v2.indices.take(10).toSeq)
    

    706
    WrappedArray(2.3869085659322193, 4.670445463955571, 6.561295835827856, 4.597686109673142, 8.932700215224111, 5.750365619611528, 2.1871123786150006, 5.520408782213984, 3.4312512246662714, 1.7430324343790569)
    WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

    3.2 分析权重

    val minMaxVals = tfidf.map { v =>
        val sv = v.asInstanceOf[SV]
        (sv.values.min, sv.values.max)
    }
    val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) =>
        (math.min(min1, min2), math.max(max1, max2))
    }
    println(globalMinMax)
    

    globalMinMax: (Double, Double) = (0.0,66155.39470409753)

    常用词

    val common = sc.parallelize(Seq(Seq("you", "do", "we")))
    val tfCommon = hashingTF.transform(common)
    val tfidfCommon = idf.transform(tfCommon)
    val commonVector = tfidfCommon.first.asInstanceOf[SV]
    println(commonVector.values.toSeq)
    

    WrappedArray(0.9965359935704624, 1.3348773448236835, 0.5457486182039175)

    不常出现的单词

    val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation", "investment")))
    val tfUncommon = hashingTF.transform(uncommon)
    val tfidfUncommon = idf.transform(tfUncommon)
    val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
    println(uncommonVector.values.toSeq)
    

    WrappedArray(5.3265513728351666, 5.308532867332488, 5.483736956357579)

    4 使用模型

    4.1 余弦相似度

    import breeze.linalg._
    
    val hockeyText = rdd.filter { case (file, text) => file.contains("hockey") }
    val hockeyTF = hockeyText.mapValues(doc => hashingTF.transform(tokenize(doc)))
    val hockeyTfIdf = idf.transform(hockeyTF.map(_._2))
    
    val hockey1 = hockeyTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
    val breeze1 = new SparseVector(hockey1.indices, hockey1.values, hockey1.size)
    
    val hockey2 = hockeyTfIdf.sample(true, 0.1, 43).first.asInstanceOf[SV]
    val breeze2 = new SparseVector(hockey2.indices, hockey2.values, hockey2.size)
    val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2))
    println(cosineSim)
    

    cosineSim: Double = 0.060250114361164626

    val graphicsText = rdd.filter { case (file, text) => file.contains("comp.graphics") }
    val graphicsTF = graphicsText.mapValues(doc => hashingTF.transform(tokenize(doc)))
    val graphicsTfIdf = idf.transform(graphicsTF.map(_._2))
    val graphics = graphicsTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
    val breezeGraphics = new SparseVector(graphics.indices, graphics.values, graphics.size)
    val cosineSim2 = breeze1.dot(breezeGraphics) / (norm(breeze1) * norm(breezeGraphics))
    println(cosineSim2)
    

    cosineSim2: Double = 0.004664850323792852

    val baseballText = rdd.filter { case (file, text) => file.contains("baseball") }
    val baseballTF = baseballText.mapValues(doc => hashingTF.transform(tokenize(doc)))
    val baseballTfIdf = idf.transform(baseballTF.map(_._2))
    val baseball = baseballTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
    val breezeBaseball = new SparseVector(baseball.indices, baseball.values, baseball.size)
    val cosineSim3 = breeze1.dot(breezeBaseball) / (norm(breeze1) * norm(breezeBaseball))
    println(cosineSim3)
    

    0.05047395039466008

    4.2 学习单词与主题的映射关系

    多分类映射
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.classification.NaiveBayes
    import org.apache.spark.mllib.evaluation.MulticlassMetrics
    
    val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
    val zipped = newsgroups.zip(tfidf)
    val train = zipped.map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
    train.cache
    
    朴素贝叶斯训练
    val model = NaiveBayes.train(train, lambda = 0.1)
    
    加载测试数据集
    val testPath = PATH+"/20news-bydate-test/*"
    val testRDD = sc.wholeTextFiles(testPath)
    val testLabels = testRDD.map { case (file, text) =>
        val topic = file.split("/").takeRight(2).head
        newsgroupsMap(topic)
    }
    val testTf = testRDD.map { case (file, text) => hashingTF.transform(tokenize(text)) }
    val testTfIdf = idf.transform(testTf)
    val zippedTest = testLabels.zip(testTfIdf)
    val test = zippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
    
    计算准确度和多分类加权F-指标
    val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
    val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
    println(accuracy)
    

    0.7915560276155071

    val metrics = new MulticlassMetrics(predictionAndLabel)
    println(metrics.weightedFMeasure)
    

    0.7810675969031116

    5 评估

    val rawTokens = rdd.map { case (file, text) => text.split(" ") }
    val rawTF = rawTokens.map(doc => hashingTF.transform(doc))
    val rawTrain = newsgroups.zip(rawTF).map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
    val rawModel = NaiveBayes.train(rawTrain, lambda = 0.1)
    val rawTestTF = testRDD.map { case (file, text) => hashingTF.transform(text.split(" ")) }
    val rawZippedTest = testLabels.zip(rawTestTF)
    val rawTest = rawZippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
    val rawPredictionAndLabel = rawTest.map(p => (rawModel.predict(p.features), p.label))
    val rawAccuracy = 1.0 * rawPredictionAndLabel.filter(x => x._1 == x._2).count() / rawTest.count()
    println(rawAccuracy)
    

    0.7648698884758365

    val rawMetrics = new MulticlassMetrics(rawPredictionAndLabel)
    println(rawMetrics.weightedFMeasure)
    

    0.7653320418573546

    6 Word2Vec模型

    Word2Vec模型(分布向量表示):把每个单词表示成一个向量,MLlib中使用skip-gram模型

    6.1 训练

    import org.apache.spark.mllib.feature.Word2Vec
    val word2vec = new Word2Vec()
    word2vec.setSeed(42) // we do this to generate the same results each time
    val word2vecModel = word2vec.fit(tokens)
    

    6.2 使用

    最相似的20个单词
    word2vecModel.findSynonyms("hockey", 20).foreach(println)
    

    (sport,1.4818968962277133)
    (ecac,1.467546566194254)
    (hispanic,1.4166835301985194)
    (glens,1.4061103042432825)
    (woofers,1.3810090447028116)
    (tournament,1.3148823031671586)
    (champs,1.3133863003013941)
    (boxscores,1.307735040384543)
    (aargh,1.274986851270267)
    (ahl,1.265165428167253)
    (playoff,1.2645991118770572)
    (ncaa,1.2383382015648046)
    (pool,1.2261154635870224)
    (champion,1.2119919989539134)
    (filinuk,1.2062208620660915)
    (olympic,1.2026738930160243)
    (motorcycles,1.2008032355579679)
    (yankees,1.1989755767973371)
    (calder,1.194001886835493)
    (homeruns,1.1800625883573932)

    word2vecModel.findSynonyms("legislation", 20).foreach(println)
    

    (accommodates,0.9918184454068688)
    (briefed,0.9256758135452989)
    (amended,0.9105987267173344)
    (telephony,0.8679173760123956)
    (pitted,0.8609974033962533)
    (aclu,0.8605885863332372)
    (licensee,0.8493930472487975)
    (agency,0.836706135804648)
    (policies,0.8337986602365566)
    (senate,0.8327312936220903)
    (businesses,0.8291191155630467)
    (permit,0.8266658804181389)
    (cpsr,0.8231228090944367)
    (cooperation,0.8195562469006543)
    (surveillance,0.8134342524628756)
    (congress,0.8132899468772855)
    (restricted,0.8115013134507126)
    (procure,0.8096839595766356)
    (inquiry,0.8086297702914405)
    (industry,0.8077900093754752)

    • legislation 立法
    • aclu 美国公民自由协会
    • senate 参议院
    • surveillance 监视
    • inquiry 调查
  • 相关阅读:
    数据结构化与保存
    使用正则表达式,取得点击次数,函数抽离
    爬取校园新闻首页的新闻
    网络爬虫基础练习
    词频统计
    试题----编写一个截取字符串的函数,输入为一个字符串和字节数,输出为按字节截取的字符串。 但是要保证汉字不被截半个
    试题---求出现重现次数最多的字母,如有多个重复的则都求出来
    试题----为什么Java的string类要设成immutable(不可变的)
    面试题---题目
    复制文件夹中所有内容到指定位置
  • 原文地址:https://www.cnblogs.com/tychyg/p/5320945.html
Copyright © 2011-2022 走看看