zoukankan      html  css  js  c++  java
  • 数据挖掘:基于Spark+HanLP实现影视评论关键词抽取(1)

    1. 背景

    近日项目要求基于爬取的影视评论信息,抽取影视的关键字信息。考虑到影视评论数据量较大,因此采用Spark处理框架。关键词提取的处理主要包含分词+算法抽取两部分。目前分词工具包较为主流的,包括哈工大的LTP以及HanLP,而关键词的抽取算法较多,包括TF-IDF、TextRank、互信息等。本次任务主要基于LTP、HanLP、Ac双数组进行分词,采用TextRank、互信息以及TF-IDF结合的方式进行关键词抽取。

    说明:本项目刚开始接触,因此效果层面需迭代调优。

    2. 技术选型

    (1) 词典

    1) 基于HanLP项目提供的词典数据,具体可参见HanLP的github

    2) 考虑到影视的垂直领域特性,引入腾讯的嵌入的汉语词,参考该地址

    (2) 分词

    1) LTP分词服务:基于Docker Swarm部署多副本集服务,通过HTTP协议请求,获取分词结果(部署方法可百度); 也可以直接在本地加载,放在内存中调用,效率更高(未尝试)

    2) AC双数组:基于AC双数组,采用最长匹配串,采用HanLP中的AC双数组分词器

    (3) 抽取

    1) 经典的TF-IDF:基于词频统计实现

    2) TextRank:借鉴于PageRank算法,基于HanLP提供的接口

    3) 互信息:基于HanLP提供的接口

    3. 实现代码

    (1) 代码结构

    1) 代码将分词服务进行函数封装,基于不同的名称,执行名称指定的分词

    2) TextRank、互信息、LTP、AC双数组等提取出分词或短语,最后均通过TF-IDF进行统计计算

    (2) 整体代码

    1) 主体代码:细节层面与下载的原始评论数据结构有关,因此无需过多关注,只需关注下主体流程即可

      1 
      2 def extractFilmKeyWords(algorithm: String): Unit ={
      3     // 测试
      4 	println(HanLPSpliter.getInstance.seg("如何看待《战狼2》中的爱国情怀?"))
      5 
      6     val sc = new SparkContext(new SparkConf().setAppName("extractFileKeyWords").set("spark.driver.maxResultSize", "3g"))
      7 
      8     val baseDir = "/work/ws/video/parse/key_word"
      9 
     10     import scala.collection.JavaConversions._
     11     def extractComments(sc: SparkContext, inputInfo: (String, String)): RDD[(String, List[String])] = {
     12       sc.textFile(s"$baseDir/data/${inputInfo._2}")
     13         .map(data => {
     14           val json = JSONObjectEx.fromObject(data.trim)
     15           if(null == json) ("", List())
     16           else{
     17             val id = json.getStringByKeys("_id")
     18             val comments: List[String] = json.getArrayInfo("comments", "review").toList
     19             val reviews: List[String] = json.getArrayInfo("reviews", "review").toList
     20             val titles: List[String] = json.getArrayInfo("reviews", "title").toList
     21             val texts = (comments ::: reviews ::: titles).filter(f => !CleanUtils.isEmpty(f))
     22             (IdBuilder.getSourceKey(inputInfo._1, id), texts)
     23           }
     24         })
     25     }
     26 
     27     // 广播停用词
     28     val filterWordRdd = sc.broadcast(sc.textFile(s"$baseDir/data/stopwords.txt").map(_.trim).distinct().collect().toList)
     29 
     30     def formatOutput(infos: List[(Int, String)]): String ={
     31       infos.map(info => {
     32         val json = new JSONObject()
     33         json.put("status", info._1)
     34         try{
     35           json.put("res", info._2)
     36         } catch {
     37           case _ => json.put("res", "[]")
     38         }
     39         json.toString.replaceAll("[\s]+", "")
     40       }).mkString(" | ")
     41     }
     42 
     43     def genContArray(words: List[String]): JSONArray ={
     44       val arr = new JSONArray()
     45       words.map(f => {
     46         val json = new JSONObject()
     47         json.put("cont", f)
     48         arr.put(json)
     49       })
     50       arr
     51     }
     52 
     53 	// 基于LTP分词服务
     54     def splitWordByLTP(texts: List[String]): List[(Int, String)] ={
     55       texts.map(f => {
     56         val url = "http://dev.content_ltp.research.com/ltp"
     57         val params = new util.HashMap[String, String]()
     58         params.put("s", f)
     59         params.put("f", "json")
     60         params.put("t", "ner")
     61         // 调用LTP分词服务
     62         val result = HttpPostUtil.httpPostRetry(url, params).replaceAll("[\s]+", "")
     63         if (CleanUtils.isEmpty(result)) (0, f) else {
     64           val resultArr = new JSONArray()
     65 
     66           val jsonArr = try { JSONArray.fromString(result) } catch { case _ => null}
     67           if (null != jsonArr && 0 < jsonArr.length()) {
     68             for (i <- 0 until jsonArr.getJSONArray(0).length()) {
     69               val subJsonArr = jsonArr.getJSONArray(0).getJSONArray(i)
     70               for (j <- 0 until subJsonArr.length()) {
     71                 val subJson = subJsonArr.getJSONObject(j)
     72                 if(!filterWordRdd.value.contains(subJson.getString("cont"))){
     73                   resultArr.put(subJson)
     74                 }
     75               }
     76             }
     77           }
     78           if(resultArr.length() > 0) (1, resultArr.toString) else (0, f)
     79         }
     80       })
     81     }
     82 
     83 	// 基于AC双数组搭建的分词服务
     84     def splitWordByAcDoubleTreeServer(texts: List[String]): List[(Int, String)] ={
     85       texts.map(f => {
     86         val splitResults = SplitQueryHelper.splitQueryText(f)
     87           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
     88         if (0 == splitResults.size) (0, f) else (1, genContArray(splitResults).toString)
     89       })
     90     }
     91 
     92 	// 内存加载AC双数组
     93     def splitWordByAcDoubleTree(texts: List[String]): List[(Int, String)] ={
     94       texts.map(f => {
     95         val splitResults =  HanLPSpliter.getInstance().seg(f)
     96           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
     97         if (0 == splitResults.size) (0, f) else (1, genContArray(splitResults).toString)
     98       })
     99     }
    100 
    101 	// TextRank
    102     def splitWordByTextRank(texts: List[String]): List[(Int, String)] ={
    103       texts.map(f => {
    104         val splitResults = HanLP.extractKeyword(f, 100)
    105           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
    106         if (0 == splitResults.size) (0, f) else {
    107           val arr = genContArray(splitResults)
    108           if(0 == arr.length()) (0, f) else (1, arr.toString)
    109         }
    110       })
    111     }
    112 
    113 	// 互信息
    114     def splitWordByMutualInfo(texts: List[String]): List[(Int, String)] ={
    115       texts.map(f => {
    116         val splitResults = HanLP.extractPhrase(f, 50)
    117           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
    118         if (0 == splitResults.size) (0, f) else {
    119           val arr = genContArray(splitResults)
    120           if(0 == arr.length()) (0, f) else (1, arr.toString)
    121         }
    122       })
    123     }
    124 
    125     // 提取分词信息
    126     val unionInputRdd = sc.union(
    127 	  extractComments(sc, SourceType.DB -> "db_review.json"),
    128       extractComments(sc, SourceType.MY -> "my_review.json"),
    129       extractComments(sc, SourceType.MT -> "mt_review.json"))
    130       .filter(_._2.nonEmpty)
    131 
    132     unionInputRdd.cache()
    133 
    134     unionInputRdd.map(data => {
    135       val splitResults = algorithm match {
    136         case "ltp" => splitWordByLTP(data._2)
    137         case "acServer" => splitWordByAcDoubleTreeServer(data._2)
    138         case "ac" => splitWordByAcDoubleTree(data._2)
    139         case "textRank" => splitWordByTextRank(data._2)
    140         case "mutualInfo" => splitWordByMutualInfo(data._2)
    141       }
    142 
    143       val output = formatOutput(splitResults)
    144       s"${data._1}	$output"
    145     }).saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/wordSplit/$algorithm"))
    146 
    147     val splitRDD = sc.textFile(s"$baseDir/result/wordSplit/$algorithm/part*", 30)
    148       .flatMap(data => {
    149         if(data.split("\t").length < 2) None
    150         else{
    151           val sourceKey = data.split("\t")(0)
    152           val words = data.split("\t")(1).split(" \| ").flatMap(f => {
    153             val json = JSONObjectEx.fromObject(f.trim)
    154             if (null != json && "1".equals(json.getStringByKeys("status"))) {
    155               val jsonArr = try { JSONArray.fromString(json.getStringByKeys("res")) } catch { case _ => null }
    156               var result: List[(String, String)] = List()
    157               if (jsonArr != null) {
    158                 for (j <- 0 until jsonArr.length()) {
    159                   val json = jsonArr.getJSONObject(j)
    160                   val cont = json.getString("cont")
    161                   result ::= (cont, cont)
    162                 }
    163               }
    164               result.reverse
    165             } else None
    166           }).toList
    167           Some((sourceKey, words))
    168         }
    169       }).filter(_._2.nonEmpty)
    170 
    171     splitRDD.cache()
    172 
    173     val totalFilms = splitRDD.count()
    174 
    175     val idfRdd = splitRDD.flatMap(result => {
    176       result._2.map(_._1).distinct.map((_, 1))
    177     }).groupByKey().filter(f => f._2.size > 1).map(f => (f._1, Math.log(totalFilms * 1.0 / (f._2.sum + 1))))
    178 
    179     idfRdd.cache()
    180     idfRdd.map(f => s"${f._1}	${f._2}").saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/idf/$algorithm"))
    181 
    182     val idfMap = sc.broadcast(idfRdd.collectAsMap())
    183     // 计算TF
    184     val tfRdd = splitRDD.map(result => {
    185       val totalWords = result._2.size
    186       val keyWords = result._2.groupBy(_._1)
    187         .map(f => {
    188           val word = f._1
    189           val tf = f._2.size * 1.0 / totalWords
    190           (tf * idfMap.value.getOrElse(word, 0D), word)
    191         }).toList.sortBy(_._1).reverse.filter(_._2.trim.length > 1).take(50)
    192       (result._1, keyWords)
    193     })
    194 
    195     tfRdd.cache()
    196     tfRdd.map(f => {
    197       val json = new JSONObject()
    198       json.put("_id", f._1)
    199 
    200       val arr = new JSONArray()
    201       for (keyWord <- f._2) {
    202         val subJson = new JSONObject()
    203         subJson.put("score", keyWord._1)
    204         subJson.put("word", keyWord._2)
    205         arr.put(subJson)
    206       }
    207       json.put("keyWords", arr)
    208       json.toString
    209     }).saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/keyword/$algorithm/withScore"))
    210 
    211     tfRdd.map(f => s"${f._1}	${f._2.map(_._2).toList.mkString(",")}")
    212       .saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/keyword/$algorithm/noScore"))
    213 
    214     tfRdd.unpersist()
    215 
    216     splitRDD.unpersist()
    217     idfMap.unpersist()
    218     idfRdd.unpersist()
    219 
    220     unionInputRdd.unpersist()
    221     filterWordRdd.unpersist()
    222     sc.stop()
    223   }
    View Code

    2) 基于HanLP提供的AC双数组封装

      1 
      2 import com.google.common.collect.Lists;
      3 import com.hankcs.hanlp.HanLP;
      4 import com.hankcs.hanlp.seg.Segment;
      5 import com.hankcs.hanlp.seg.common.Term;
      6 import org.slf4j.Logger;
      7 import org.slf4j.LoggerFactory;
      8 
      9 import java.io.Serializable;
     10 import java.util.List;
     11 
     12 public class HanLPSpliter implements Serializable{
     13     private static Logger logger = LoggerFactory.getLogger(Act.class);
     14 
     15     private static HanLPSpliter instance = null;
     16 
     17     private static Segment segment = null;
     18 
     19     private static final String PATH = "conf/tencent_word_act.txt";
     20 
     21     public static HanLPSpliter getInstance() {
     22         if(null == instance){
     23             instance = new HanLPSpliter();
     24         }
     25         return instance;
     26     }
     27 
     28     public HanLPSpliter(){
     29         this.init();
     30     }
     31 
     32     public void init(){
     33         initSegment();
     34     }
     35 
     36     public void initSegment(){
     37         if(null == segment){
     38             addDict();
     39             HanLP.Config.IOAdapter = new HadoopFileIOAdapter();
     40             segment = HanLP.newSegment("dat");
     41             segment.enablePartOfSpeechTagging(true);
     42             segment.enableCustomDictionaryForcing(true);
     43         }
     44     }
     45 
     46     public List<String> seg(String text){
     47         if(null == segment){
     48             initSegment();
     49         }
     50 
     51         List<Term> terms = segment.seg(text);
     52         List<String> results = Lists.newArrayList();
     53         for(Term term : terms){
     54             results.add(term.word);
     55         }
     56         return results;
     57     }
     58 }
    View Code

    3) HanLP加载HDFS中的自定义词典

      1 import com.hankcs.hanlp.corpus.io.IIOAdapter;
      2 import org.apache.hadoop.conf.Configuration;
      3 import org.apache.hadoop.fs.FileSystem;
      4 import org.apache.hadoop.fs.Path;
      5 
      6 import java.io.IOException;
      7 import java.io.InputStream;
      8 import java.io.OutputStream;
      9 import java.net.URI;
     10 
     11 public class HadoopFileIOAdapter implements IIOAdapter{
     12     @Override
     13     public InputStream open(String path) throws IOException {
     14         Configuration conf = new Configuration();
     15         FileSystem fs = FileSystem.get(URI.create(path), conf);
     16         return fs.open(new Path(path));
     17     }
     18 
     19     @Override
     20     public OutputStream create(String path) throws IOException {
     21         Configuration conf = new Configuration();
     22         FileSystem fs = FileSystem.get(URI.create(path), conf);
     23         OutputStream out = fs.create(new Path(path));
     24         return out;
     25     }
     26 }
    View Code

    4. 采坑总结

    (1) Spark中实现HanLP自定义词典的加载

    由于引入腾讯的嵌入词,因此使用HanLP的自定义词典功能,参考的方法如下:

    a. 《基于hanLP的中文分词详解-MapReduce实现&自定义词典文件》,该方法适用于自定义词典的数量较少的情况,如果词典量较大,如腾讯嵌入词820W+,理论上jar包较为臃肿

    b. 《Spark中使用HanLP分词》,该方法的好处在于无需手工构件词典的bin文件,操作简单

    切记:如果想让自定义词典生效,需先将data/dictionary/custom中的bin文件删除。通过HanLP源码得知,如果存在bin文件,则直接加载该bin文件,否则会将custom中用户自定义的词典重新加载,在指定的环境中(如本地或HDFS)中自动生成bin文件。

    腾讯820W词典,基于HanLP生成bin文件的时间大概为30分钟。

    (2) Spark异常

    Spark执行过程中的异常信息:

    1) 异常1

    a. 异常信息:

    Job aborted due to stage failure: Total size of serialized results of 3979 tasks (1024.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

    b. 解决:通过设置spark.driver.maxResultSize=4G,参考:《Spark排错与优化

    2) 异常2

    a. 异常信息:java.lang.OutOfMemoryError: Java heap space

    b. 解决:参考https://blog.csdn.net/guohecang/article/details/52088117

    如有问题,请留言回复!

  • 相关阅读:
    Vue--运行项目发送http://localhost:8080/sockjs-node/info请求报错,造成浏览器不能热更新
    Vue笔记--同局域网下访问本地项目
    Vue笔记--通过自定义指令实现按钮操作权限
    css/css3实现未知宽高元素的垂直居中和水平居中
    【转载】Vue路由history模式踩坑记录:nginx配置解决404问题
    给动态生成的input框,添加readonly属性
    layui-form下隐藏元素的验证问题
    layui的省市县三级联动
    webstorm-激活码
    采坑
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/9902276.html
Copyright © 2011-2022 走看看