zoukankan      html  css  js  c++  java
  • 基于mllib的spark中文文本分类(朴素贝叶斯)

    基于mllib的spark中文文本分类(朴素贝叶斯)

    本文参考博客 https://blog.csdn.net/github_36326955/article/details/54891204
    使用spark中ml包进行中文文本分类参见 https://www.cnblogs.com/DismalSnail/p/11802281.html

    首先介绍一下文本分类的大致流程

    • 预处理
    • 中文分词
    • 构建词向量空间
    • 训练模型
    • 用训练好的模型进行预测
    • 通过预测结果对模型进行评估

    预处理

    • 语料库
    • 文本格式转换

    语料库

    要进行文本分类,首先要有文本,复旦中文文本语料库

    百度云盘链接:https://pan.baidu.com/s/1nKAmM8EuF54sgtMGhZN9tw

    密码 ns8e

    文本格式转换

    由于下载的语料库是GBK格式的,为了处理方便,需要转成UTF-8的格式,转换代码如下


    package com.classification.text
    
    import java.io.File
    
    import org.apache.commons.io.FileUtils //Java的文件处理工具包
    
    object GBK2UTF {
    
      def GBK2UTF8(GBKCorpusPath: String, UTF8CorpusPath: String): Unit = {
        //打开根目录
        val GBKCorpusDir: Array[File] = new File(GBKCorpusPath).listFiles()
        //对应的UTF-8格式的目录是否存在,不存在新建
        val UTFCorpusDir: File = new File(UTF8CorpusPath);
        if (!UTFCorpusDir.exists()) {
          UTFCorpusDir.mkdir()
        }
    
        //打开类别目录
        for (gbkClassDir: File <- GBKCorpusDir) {
          //记录目录路径,为创建UTF-8格式的文件夹和文件提供路径
          val UTFClassDirPath: String = UTF8CorpusPath + gbkClassDir.getName
          //UTF-8格式的类别目录是否存在,不存在新建
          val UTFClassDir: File = new File(UTFClassDirPath)
          if (!UTFClassDir.exists()) {
            UTFClassDir.mkdir()
          }
    
          for (gbkText: File <- gbkClassDir.listFiles()) {
            //将文件以GBK格式读取为字符串,转为UTF-8格式后写入新文件
            FileUtils.write(new File(UTFClassDirPath + "/" + gbkText),
              FileUtils.readFileToString(gbkText, "GBK"), "UTF-8")
          }
        }
    
      }
    
    
      def main(args: Array[String]): Unit = {
        GBK2UTF8("./train_corpus/", "./utf_train_corpus/")
        GBK2UTF8("./test_corpus/", "./utf_test_corpus/")
      }
    
    }
    

    中文分词

    • 分词工具介绍
    • 选择Ansj作为分词工具,以及注意事项
    • Ansj中文分词实现

    分词工具介绍

    中文分词的理论部分很多博客都有介绍,这里主要介绍代码实现(理论咱现在也不会,就会调用API)。如果用Python,分词一般选择jieba分词,jieba分词也有Java版的,但是用起来不是很方便。如果用Java或Scala,就要选择Java版的中文分词工具,我用的主要是Ansj和HanLP,两个分词工具可以百度,感觉HanLP比较强大。

    选择Ansj作为分词工具,以及注意事项

    本次实验选择Ansj作为中文文本分类的工具。注意:如果需要添加自定义词典,词典内的空白都必须是tab,但是如果使用Idea编辑词典文件,tab键默认为4个空格(这种细节注意不到会让人崩溃)

    Ansj中文分词的实现


    package com.classification.text
    
    import java.io.File
    import java.util
    
    import org.ansj.domain.Result
    import org.ansj.recognition.impl.StopRecognition
    import org.ansj.splitWord.analysis.ToAnalysis
    import org.apache.commons.io.FileUtils
    
    //scala集合与java集合转换的包,按住Ctrl点进源码,可以查看转换规则
    import scala.collection.JavaConversions._
    
    
    object WordSplit {
    
      //分词函数
      def corpusSegment(utfCorpusPath: String, utfSegmentPath: String, trainLabelListPath: String, trainSegmentPath: String): Unit = {
        //计数用,统计样本个数
        var count = 0
        //存放标签的Java数组,这里使用java数组是为了方便写入文件
        val labelList = new util.ArrayList[String]()
        //存放分词后字符串的数组,同样为了方便写入文件
        val contextList = new util.ArrayList[String]()
        //打开根目录
        val corpusDir: Array[File] = new File(utfCorpusPath).listFiles()
        //类别目录
        for (corpusClassDir: File <- corpusDir) {
          //每一个文件
          for (utfText <- corpusClassDir.listFiles()) {
    
            count = count + 1
            //调用分词方法
            val textSeg: Result = ToAnalysis.parse(FileUtils.readFileToString(utfText)
              .replace("
    ", "") //去除换行和回车
              .replace("
    ", "") //去除单独的回车
              .replace("
    ", "") //去除单独的换行
              .replace(" ", "") //去除空格
              .replace("u3000", "") //去除全角空格(中文空格)
              .replace("	", "") //去除制表符
              .replaceAll(s"\pP|\pS|\pC|\pN|\pZ", "") //通过设置Unicode类别的相关正则去除符号
              .trim
            )
            //读取停用词,就是一些对分类没有作用的词,去除可以对特征向量降维
            val stopWordList: Seq[String] = FileUtils.readFileToString(new File("./stopWords/stop_word_chinese.txt"))
              .split("
    ").toSeq
            //新建停用词对象
            val filter = new StopRecognition()
            //加载停用词列表
            filter.insertStopWords(stopWordList)
            //去除停用词
            val result: Result = textSeg.recognition(filter)
    
            /**
             *这里如果将每篇文章的分词单独写入一个文件,则在构建词向量时,spark
             * 就要分别读取每篇文章的分词,而spark每读一个文件,就会就会产生一个RDD,
             * 这样读取所有文本的分词就会产生巨量的RDD,这时把这些分词合并到一个集合中(巨量的RDD
             * 合并成一个RDD)时,spark在构建DAG时就会爆掉(亲身经历,当时用的时RDD的union方法)
             */
            
            //将分词内容加入列表
            contextList.add(result.toStringWithOutNature)
            //将标签加入列表,标签的顺序和文本分词后的顺序是对应的
            labelList.add(corpusClassDir.getName)
    
          }
        }
        println(count)
        //将分词写入文件
        FileUtils.writeLines(new File(trainSegmentPath), "UTF-8", contextList)
        //将文本标签写入文件
        FileUtils.writeLines(new File(trainLabelListPath), "UTF-8", labelList)
    
      }
    
      def main(args: Array[String]): Unit = {
        //这里该了一些目录结构,对代码的功能没有影响
        corpusSegment("./train/utf_train_corpus/", "./train/utf_train_segment/",
          "./train/train_label_list.txt", "./train/train_seg.txt")
    
        corpusSegment("./test/utf_test_corpus/", "./test/utf_test_segment/",
          "./test/test_label_list.txt", "./test/test_seg.txt")
      }
    }
    
    

    构建词向量空间、训练、预测、评估

    分词完成后就可以构建词向量、训练、预测、评估了


    package com.classification.text
    
    import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
    import org.apache.spark.mllib.feature.{HashingTF, IDF, IDFModel}
    import org.apache.spark.mllib.linalg.Vector
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Classification {
      //读取分词文件和标签文件,两个文件读取后都是RDD形式,元组的形式返回
      def getDocumentsAndLabels(sc: SparkContext, segPath: String, labelListPath: String): (RDD[Seq[String]], Iterator[String]) = {
        (sc.textFile(segPath).map(_.split(",").toSeq), sc.textFile(labelListPath).collect().toSeq.toIterator)
      }
    
      //训练函数
      def train(sc: SparkContext, trainSegPath: String, trainLabelListPath: String): NaiveBayesModel = {
        //读取训练集的分词和标签
        val (documents, labelList) = getDocumentsAndLabels(sc, trainSegPath, trainLabelListPath)
        //新建HashingTF类
        val hashingTF: HashingTF = new HashingTF()
        //计算TF值
        val tf: RDD[Vector] = hashingTF.transform(documents)
    
        //缓存,为了计算快,对功能没有影响
        tf.cache()
        //计算IDF值
        val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
        //计算TF-IDF值
        val tfIdf: RDD[Vector] = idf.transform(tf)
        //将TFIDF数据,结合标签,转为LabelPoint数据,LabelPoint是训练函数NaiveBayes.train()的输入数据格式
        val training: RDD[LabeledPoint] = tfIdf.map {
          vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
        }
        //训练函数训练,
        NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")
      }
    
      //测试函数,参数model为训练集训练后的模型
      def test(sc: SparkContext, testSegPath: String, testLabelListPath: String, model: NaiveBayesModel): Double = {
    
        //读取测试数据集分词和标签数据
        val (documents, labelList) = getDocumentsAndLabels(sc, testSegPath, testLabelListPath)
    
        //和训练的步骤差不多
        val hashingTF: HashingTF = new HashingTF()
        val tf: RDD[Vector] = hashingTF.transform(documents)
        tf.cache()
        val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
        val tfIdf: RDD[Vector] = idf.transform(tf)
        val test: RDD[LabeledPoint] = tfIdf.map {
          vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
        }
        //预测
        val predictionAndLabel: RDD[(Double, Double)] = test.map((p: LabeledPoint) => (model.predict(p.features), p.label))
        //计算准确率
        1.0 * predictionAndLabel.filter((x: (Double, Double)) => x._1 == x._2).count() / test.count()
      }
    
      //获取标签对应的Double数值,将标签中的数组作为标签对应的数值
      //C11Space -> 11.0
      def getDoubleOfLabel(label: String): Double = {
        label.split("-")(0).tail.toDouble
      }
    
      def main(args: Array[String]): Unit = {
        //新建spark上下文
        val conf: SparkConf = new SparkConf().setAppName("Classification").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        //调用处理函数
        println(test(sc, "./test/test_seg.txt",
          "./test/test_label_list.txt",
          train(sc,
            "./train/train_seg.txt",
            "./train/train_label_list.txt"
          )
        )
        )
      }
    }
    
    

    到此分词的步骤就结束了,要想提高分词的准确率可以尝试不同的分词工具和文本分类算法

  • 相关阅读:
    oracle修改字符编码
    oracle修改约束列
    oracle非空约束
    Linux修改字符集
    修改oracle字符集合
    word问题禁止宏
    增加修改表列
    oracle增加sequence
    增加 修改oracle约束条件
    oracle用户 密码永不过期
  • 原文地址:https://www.cnblogs.com/DismalSnail/p/11801742.html
Copyright © 2011-2022 走看看