基于mllib的spark中文文本分类(朴素贝叶斯)
本文参考博客 https://blog.csdn.net/github_36326955/article/details/54891204
使用spark中ml包进行中文文本分类参见 https://www.cnblogs.com/DismalSnail/p/11802281.html
首先介绍一下文本分类的大致流程
- 预处理
- 中文分词
- 构建词向量空间
- 训练模型
- 用训练好的模型进行预测
- 通过预测结果对模型进行评估
预处理
- 语料库
- 文本格式转换
语料库
要进行文本分类,首先要有文本,复旦中文文本语料库
百度云盘链接:https://pan.baidu.com/s/1nKAmM8EuF54sgtMGhZN9tw
密码 ns8e
文本格式转换
由于下载的语料库是GBK格式的,为了处理方便,需要转成UTF-8的格式,转换代码如下
package com.classification.text
import java.io.File
import org.apache.commons.io.FileUtils //Java的文件处理工具包
object GBK2UTF {
def GBK2UTF8(GBKCorpusPath: String, UTF8CorpusPath: String): Unit = {
//打开根目录
val GBKCorpusDir: Array[File] = new File(GBKCorpusPath).listFiles()
//对应的UTF-8格式的目录是否存在,不存在新建
val UTFCorpusDir: File = new File(UTF8CorpusPath);
if (!UTFCorpusDir.exists()) {
UTFCorpusDir.mkdir()
}
//打开类别目录
for (gbkClassDir: File <- GBKCorpusDir) {
//记录目录路径,为创建UTF-8格式的文件夹和文件提供路径
val UTFClassDirPath: String = UTF8CorpusPath + gbkClassDir.getName
//UTF-8格式的类别目录是否存在,不存在新建
val UTFClassDir: File = new File(UTFClassDirPath)
if (!UTFClassDir.exists()) {
UTFClassDir.mkdir()
}
for (gbkText: File <- gbkClassDir.listFiles()) {
//将文件以GBK格式读取为字符串,转为UTF-8格式后写入新文件
FileUtils.write(new File(UTFClassDirPath + "/" + gbkText),
FileUtils.readFileToString(gbkText, "GBK"), "UTF-8")
}
}
}
def main(args: Array[String]): Unit = {
GBK2UTF8("./train_corpus/", "./utf_train_corpus/")
GBK2UTF8("./test_corpus/", "./utf_test_corpus/")
}
}
中文分词
- 分词工具介绍
- 选择Ansj作为分词工具,以及注意事项
- Ansj中文分词实现
分词工具介绍
中文分词的理论部分很多博客都有介绍,这里主要介绍代码实现(理论咱现在也不会,就会调用API)。如果用Python,分词一般选择jieba分词,jieba分词也有Java版的,但是用起来不是很方便。如果用Java或Scala,就要选择Java版的中文分词工具,我用的主要是Ansj和HanLP,两个分词工具可以百度,感觉HanLP比较强大。
选择Ansj作为分词工具,以及注意事项
本次实验选择Ansj作为中文文本分类的工具。注意:如果需要添加自定义词典,词典内的空白都必须是tab,但是如果使用Idea编辑词典文件,tab键默认为4个空格(这种细节注意不到会让人崩溃)
Ansj中文分词的实现
package com.classification.text
import java.io.File
import java.util
import org.ansj.domain.Result
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.commons.io.FileUtils
//scala集合与java集合转换的包,按住Ctrl点进源码,可以查看转换规则
import scala.collection.JavaConversions._
object WordSplit {
//分词函数
def corpusSegment(utfCorpusPath: String, utfSegmentPath: String, trainLabelListPath: String, trainSegmentPath: String): Unit = {
//计数用,统计样本个数
var count = 0
//存放标签的Java数组,这里使用java数组是为了方便写入文件
val labelList = new util.ArrayList[String]()
//存放分词后字符串的数组,同样为了方便写入文件
val contextList = new util.ArrayList[String]()
//打开根目录
val corpusDir: Array[File] = new File(utfCorpusPath).listFiles()
//类别目录
for (corpusClassDir: File <- corpusDir) {
//每一个文件
for (utfText <- corpusClassDir.listFiles()) {
count = count + 1
//调用分词方法
val textSeg: Result = ToAnalysis.parse(FileUtils.readFileToString(utfText)
.replace("
", "") //去除换行和回车
.replace("
", "") //去除单独的回车
.replace("
", "") //去除单独的换行
.replace(" ", "") //去除空格
.replace("u3000", "") //去除全角空格(中文空格)
.replace(" ", "") //去除制表符
.replaceAll(s"\pP|\pS|\pC|\pN|\pZ", "") //通过设置Unicode类别的相关正则去除符号
.trim
)
//读取停用词,就是一些对分类没有作用的词,去除可以对特征向量降维
val stopWordList: Seq[String] = FileUtils.readFileToString(new File("./stopWords/stop_word_chinese.txt"))
.split("
").toSeq
//新建停用词对象
val filter = new StopRecognition()
//加载停用词列表
filter.insertStopWords(stopWordList)
//去除停用词
val result: Result = textSeg.recognition(filter)
/**
*这里如果将每篇文章的分词单独写入一个文件,则在构建词向量时,spark
* 就要分别读取每篇文章的分词,而spark每读一个文件,就会就会产生一个RDD,
* 这样读取所有文本的分词就会产生巨量的RDD,这时把这些分词合并到一个集合中(巨量的RDD
* 合并成一个RDD)时,spark在构建DAG时就会爆掉(亲身经历,当时用的时RDD的union方法)
*/
//将分词内容加入列表
contextList.add(result.toStringWithOutNature)
//将标签加入列表,标签的顺序和文本分词后的顺序是对应的
labelList.add(corpusClassDir.getName)
}
}
println(count)
//将分词写入文件
FileUtils.writeLines(new File(trainSegmentPath), "UTF-8", contextList)
//将文本标签写入文件
FileUtils.writeLines(new File(trainLabelListPath), "UTF-8", labelList)
}
def main(args: Array[String]): Unit = {
//这里该了一些目录结构,对代码的功能没有影响
corpusSegment("./train/utf_train_corpus/", "./train/utf_train_segment/",
"./train/train_label_list.txt", "./train/train_seg.txt")
corpusSegment("./test/utf_test_corpus/", "./test/utf_test_segment/",
"./test/test_label_list.txt", "./test/test_seg.txt")
}
}
构建词向量空间、训练、预测、评估
分词完成后就可以构建词向量、训练、预测、评估了
package com.classification.text
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.feature.{HashingTF, IDF, IDFModel}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Classification {
//读取分词文件和标签文件,两个文件读取后都是RDD形式,元组的形式返回
def getDocumentsAndLabels(sc: SparkContext, segPath: String, labelListPath: String): (RDD[Seq[String]], Iterator[String]) = {
(sc.textFile(segPath).map(_.split(",").toSeq), sc.textFile(labelListPath).collect().toSeq.toIterator)
}
//训练函数
def train(sc: SparkContext, trainSegPath: String, trainLabelListPath: String): NaiveBayesModel = {
//读取训练集的分词和标签
val (documents, labelList) = getDocumentsAndLabels(sc, trainSegPath, trainLabelListPath)
//新建HashingTF类
val hashingTF: HashingTF = new HashingTF()
//计算TF值
val tf: RDD[Vector] = hashingTF.transform(documents)
//缓存,为了计算快,对功能没有影响
tf.cache()
//计算IDF值
val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
//计算TF-IDF值
val tfIdf: RDD[Vector] = idf.transform(tf)
//将TFIDF数据,结合标签,转为LabelPoint数据,LabelPoint是训练函数NaiveBayes.train()的输入数据格式
val training: RDD[LabeledPoint] = tfIdf.map {
vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
}
//训练函数训练,
NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")
}
//测试函数,参数model为训练集训练后的模型
def test(sc: SparkContext, testSegPath: String, testLabelListPath: String, model: NaiveBayesModel): Double = {
//读取测试数据集分词和标签数据
val (documents, labelList) = getDocumentsAndLabels(sc, testSegPath, testLabelListPath)
//和训练的步骤差不多
val hashingTF: HashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
tf.cache()
val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
val tfIdf: RDD[Vector] = idf.transform(tf)
val test: RDD[LabeledPoint] = tfIdf.map {
vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
}
//预测
val predictionAndLabel: RDD[(Double, Double)] = test.map((p: LabeledPoint) => (model.predict(p.features), p.label))
//计算准确率
1.0 * predictionAndLabel.filter((x: (Double, Double)) => x._1 == x._2).count() / test.count()
}
//获取标签对应的Double数值,将标签中的数组作为标签对应的数值
//C11Space -> 11.0
def getDoubleOfLabel(label: String): Double = {
label.split("-")(0).tail.toDouble
}
def main(args: Array[String]): Unit = {
//新建spark上下文
val conf: SparkConf = new SparkConf().setAppName("Classification").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//调用处理函数
println(test(sc, "./test/test_seg.txt",
"./test/test_label_list.txt",
train(sc,
"./train/train_seg.txt",
"./train/train_label_list.txt"
)
)
)
}
}
到此分词的步骤就结束了,要想提高分词的准确率可以尝试不同的分词工具和文本分类算法