zoukankan      html  css  js  c++  java
  • Spark实现TF-IDF——文本相似度计算

            在Spark1.2之后,Spark自带实现TF-IDF接口,只要直接调用就可以,但实际上,Spark自带的词典大小设置较于古板,如果设置小了,则导致无法计算,如果设置大了,Driver端回收数据的时候,容易发生OOM,所以更多时候都是自己根据实际情况手动实现TF-IDF。不过,在本篇文章中,两种方式都会介绍。

    数据准备:

            val df = ss.sql("select * from bigdatas.news_seg")
            //如果hive表的数据没有切词,则先对数据进行切词操作(hive里面每一行是用空格将各个词连接的字符串,或者说是一篇文章,结尾使用##@@##标识),得到一个数组类型数据
            val df_seg = df.selectExpr("split(split(sentence,'##@@##')[0],' ') as seg")

    一、Spark自带TF-IDF

    1、Spark自带TF实现

            首先需要实例化HashingTF,这个类用于根据给传入的各篇已经分好词的文章,对里面的每个词进行hashing计算,每个hashing值对应词表的一个位置,以及对每个词在每篇文章中的一个统计;

            这个类有一个方法setBinaty()可以设置其统计时的计算方式:多项式分布计算和伯努利分布计算:

    • setBinary(false):多项式分布计算,一个词在一篇文章中出现多少次,计算多少次;
    • setBinary(true):伯努利分布计算,一个词在一篇文章中,不管多少次,只要出现了,就为1,否则为0

            还有一个重要方法setNumFeatures(),用于设置词表的大小,默认是2^18。

            实例化HashingTF之后,使用transform就可以计算词频(TF)。

    TF代码实现:

    //            多项式分布计算
            val hashingTF = new HashingTF()
                .setBinary(false)
                .setInputCol("seg")
                .setOutputCol("feature_tf")
                .setNumFeatures(1<<18)
    //            伯努利分布计算
            val hashingTF_BN = new HashingTF()
                .setBinary(true)
                .setInputCol("seg")
                .setOutputCol("feature_tf")
                .setNumFeatures(1<<18)
     
            /**
              * hashingTF.transform(df_seg):转换之后会在原来基础上增加一列,就是setOutputCol("feature_tf")设置的列
              * 新增列的数据结构为:(词表大小,[该行数据的每个词对应词表的hashCode],[该行数据的每个词在该行数据出现的次数,即多项式统计词频])
              */
            val df_tf = hashingTF.transform(df_seg).select("feature_tf")

    最后列“feature_tf”的数据结构为(词典大小, [hashingCode], [term freq])。

    2、Spark自带实现TF-IDF

    对word进行idf加权(完成tf计算的基础上)

            实现原理跟上一步的TF类似,但多出一步,这一步是用于扫描一次上一步计算出来的tf数据。

    代码如下:

            val idf = new IDF()
                .setInputCol("feature_tf")
                .setOutputCol("feature_tfidf")
                .setMinDocFreq(2)
            //fit():内部对df_tf进行遍历,统计doc Freq,这个操作是在tf完成后才做的
            val idfModel = idf.fit(df_tf)
            val df_tfidf = idfModel.transform(df_tf).select("feature_tfidf")

    二、自己实现TF-IDF

    手动实现,有计算的先后问题,必须先算DocFreq(DF),再算TermFreq(TF)。

    1、doc Freq 文档频率计算 -> 同时可以得到所有文章的单词集合(词典)

            df的数据是每一行代表一篇文章,那么在计算某个词出现的文章次数,那么转化为某个词的统计。即将一篇文章切好词之后,放在一个set集合里面,表示这个set集合的每个词出现1次;那么将所有文章的词都切好,放在set集合里面,每篇文章拥有一个set集合,然后再根据词groupBy,Count,就可以得到每个词的DocFreq。

            val setUDF = udf((str:String)=>str.split(" ").distinct)
    //        1.1、对每篇文章的词进行去重操作,即set集合
            val df_set = df.withColumn("words_set",setUDF(df("sentence")))
    //        1.2、行转列,groupby、count,顺带可以求出词典大小,以及每个词对应在词典的位置index
            val docFreq_map = df_set.select(explode(df_set("words_set")).as("word"))
                .groupBy("word")
                .count()
                .rdd
                .map(x=>(x(0).toString,x(1).toString))
                .collectAsMap() //collect有一个重要特性,就是会将数据回收到Driver端,方便分发

    上面计算出来的数据格式为:Map(word->wordCount),或理解为:Map(word->DocFreq)

            除了计算出DF外,还要顺便计算词典大小,因为词典大小代表了向量的大小,以及每个词对应词典的位置。

            val dictSize = docFreq_map.size
    //        对单词进行编码,得到索引,类型为int,每个词对应于[0,dictSize-1]区间的一个位置
            val wordEncode = docFreq_map.keys.zipWithIndex.toMap

    2、term Freq 词频计算 -> 同时计算tf-idf

            词频计算其实就是做wordCount,这里重点还需要顺带计算TF-IDF。

            val docCount = df_seg.count()
            val mapUDF = udf { str: String =>
        //每一行处理就是处理一篇文章
    //            wordCOunt
                val tfMap = str.split("##@##")(0).split(" ")
                    .map((_,1L))
                    .groupBy(_._1)
                    .mapValues(_.length)
     
    //            tfMap{term->termCount}
    //            docFreq_map{term->termDocCount}
    //            wordEncode{term->index}
    //            处理后的tfIDFMap数据结果为:[(index:int,tf_idf:Double)]     //必须要处理成这种形式
                val tfIDFMap = tfMap.map{x=>
                    val idf_v = math.log10(docCount.toDouble/(docFreq_map.getOrElse(x._1,"0.0").toDouble+1))
                    (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v)
                }
    //              做成向量:第一个参数为向量大小(词典大小);第二个参数用于给指定的index赋值为tf-idf
                Vectors.sparse(dictSize,tfIDFMap.toSeq)
            }
            val dfTF = df.withColumn("tf_idf",mapUDF(df("sentence")))

    三、完整Demo代码

    package com.cjs
     
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.ml.feature.HashingTF
    import org.apache.spark.ml.feature.IDF
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
     
    object TFIDFTransform {
        def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
     
            val conf = new SparkConf()
                .set("spark.some.config.option","some-value")
     
            val ss = SparkSession
                .builder()
                .config(conf)
                .enableHiveSupport()
                .appName("test_tf-idf")
                //.master("local[2]") //单机版配置,集群情况下,可以去掉
                .getOrCreate()
     
            val df = ss.sql("select * from bigdatas.news_seg")
     
            //如果hive表的数据没有切词,则先对数据进行切词操作(hive里面每一行是用空格将各个词连接的字符串,或者说是一篇文章,结尾使用##@@##标识),得到一个数组类型数据
            val df_seg = df.selectExpr("split(split(sentence,'##@@##')[0],' ') as seg")
     
            val docCount = df_seg.count()
            //一、spark自带tf-idf实现
            //词典默认是2^20,先给一个词的一个hashCode,对应于词典的一个位置
            //词典空间过大,Driver进行数据回收时,容易出现OOM
    //        1、spark自带TF实现
            /**
              setBinary:
                 false:多项式分布 -> 一个词在一篇文章中出现多少次,计算多少次
                 true:伯努利分布 -> 一个词在一篇文章中,不管多少次,只要出现了,就为1,否则为0
              **/
            /**
              * setInputCol("seg") :输入参数(DF)的列名
              * setOutputCol("feature_tf"):输出结果(结果)的列名
              * setNumFeatures(1<<18):设置词表大小,默认是1<<18
              */
    //            多项式分布计算
            val hashingTF = new HashingTF()
                .setBinary(false)
                .setInputCol("seg")
                .setOutputCol("feature_tf")
                .setNumFeatures(1<<18)
    //            伯努利分布计算
            val hashingTF_BN = new HashingTF()
                .setBinary(true)
                .setInputCol("seg")
                .setOutputCol("feature_tf")
                .setNumFeatures(1<<18)
     
            /**
              * hashingTF.transform(df_seg):转换之后会在原来基础上增加一列,就是setOutputCol("feature_tf")设置的列
              * 新增列的数据结构为:(词表大小,[该行数据的每个词对应词表的hashCode],[该行数据的每个词在该行数据出现的次数,即多项式统计词频])
              */
            val df_tf = hashingTF.transform(df_seg).select("feature_tf")
     
    //        2、spark自带IDF实现, 对word进行idf加权(完成tf计算的基础上)
            val idf = new IDF()
                .setInputCol("feature_tf")
                .setOutputCol("feature_tfidf")
                .setMinDocFreq(2)
            //fit():内部对df_tf进行遍历,统计doc Freq,这个操作是在tf完成后才做的
            val idfModel = idf.fit(df_tf)
            val df_tfidf = idfModel.transform(df_tf).select("feature_tfidf")
     
     
            //二、自己实现tf-idf,有顺序的实现
            //1、doc Freq 文档频率计算 -> 同时可以得到所有文章的单词集合(词典)
            val setUDF = udf((str:String)=>str.split(" ").distinct)
    //        1.1、对每篇文章的词进行去重操作,即set集合
            val df_set = df.withColumn("words_set",setUDF(df("sentence")))
    //        1.2、行转列,groupby、count,顺带可以求出词典大小,以及每个词对应在词典的位置index
            val docFreq_map = df_set.select(explode(df_set("words_set")).as("word"))
                .groupBy("word")
                .count()
                .rdd
    //            .map(x=>(x(0).toString,x(1).toString))
                .map(x=>(x(0).toString,math.log10(docCount.toDouble/(x(1).toString.toDouble+1))))   //顺带计算idf
                .collectAsMap() //collect有一个重要特性,就是会将数据回收到Driver端,方便分发
            val dictSize = docFreq_map.size
    //        对单词进行编码,得到索引,类型为int,每个词对应于[0,dictSize-1]区间的一个位置
            val wordEncode = docFreq_map.keys.zipWithIndex.toMap
            //2、term Freq 词频计算
    //        返回数据结构:
            val mapUDF = udf { str: String =>
        //每一行处理就是处理一篇文章
    //            wordCOunt
                val tfMap = str.split("##@##")(0).split(" ")
                    .map((_,1L))
                    .groupBy(_._1)
                    .mapValues(_.length)
     
    //            tfMap{term->termCount}
    //            docFreq_map{term->termDocCount}
    //            wordEncode{term->index}
    //            处理后的tfIDFMap数据结果为:[(index:int,tf_idf:Double)]     //必须要处理成这种形式
                val tfIDFMap = tfMap.map{x=>
    //                val idf_v = math.log10(docCount.toDouble/(docFreq_map.getOrElse(x._1,"0.0").toDouble+1))
    //                (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v)
                    val idf_v = docFreq_map.getOrElse(x._1,0.0)
                    (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v)  //已经在第1步算出idf情况下使用
                }
    //              做成向量:第一个参数为向量大小(词典大小);第二个参数用于给指定的index赋值为tf-idf
                Vectors.sparse(dictSize,tfIDFMap.toSeq)
            }
            val dfTF = df.withColumn("tf_idf",mapUDF(df("sentence")))
        }
    }

     

  • 相关阅读:
    Java实现 LeetCode 343 整数拆分(动态规划入门经典)
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 342 4的幂
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 341 扁平化嵌套列表迭代器
    Java实现 LeetCode 338 比特位计数
    H264(NAL简介与I帧判断)
    分享一段H264视频和AAC音频的RTP封包代码
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11466724.html
Copyright © 2011-2022 走看看