zoukankan      html  css  js  c++  java
  • 用Spark-NLP建立文本分类模型

    作者|GUEST
    编译|VK
    来源|Analytics Vidhya

    概述

    • 在AWS电子病历上建立John Snow实验室的Spark NLP,并使用该库对BBC文章进行简单的文本分类。

    介绍

    自然语言处理是全球数据科学团队的重要过程之一。随着数据的不断增长,大多数组织已经转移到大数据平台,如apachehadoop和AWS、Azure和GCP等云产品。

    这些平台不仅能够处理大数据,使组织能够对非结构化数据(如文本分类)进行大规模分析。但在机器学习方面,大数据系统和机器学习工具之间仍然存在差距。

    流行的机器学习python库,如scikit-learn和Gensim,经过高度优化,可以在单节点计算机上执行,而不是为分布式环境设计的。

    Apache Spark MLlib是许多帮助弥合这一差距的工具之一,它提供了大多数机器学习模型,如线性回归、Logistic回归、支持向量机、随机森林、K-means、LDA等,以执行最常见的机器学习任务。

    除了机器学习算法,Spark MLlib还提供了大量的特征变换器,如Tokenizer、StopWordRemover、n-grams和countvector、TF-IDF和Word2Vec等。

    虽然这些转换器和提取器足以构建基本的NLP管道,但是要构建一个更全面和生产级的管道,我们需要更先进的技术,如词干分析、词法化、词性标记和命名实体识别。

    Spark NLP提供了各种注释器来执行高级NLP任务。有关更多信息,请在网站上查看注释器列表及其用法

    https://nlp.johnsnowlabs.com/docs/en/annotators。

    设置环境

    让我们继续看看如何在AWS EMR上设置Spark NLP。

    1.在启动EMR集群之前,我们需要创建一个引导操作。引导操作用于设置其他软件或自定义群集节点的配置。以下是可用于在EMR集群上设置Spark NLP的引导操作,

    #!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip
    #
    sudo python36 -m pip install pandas
    #
    sudo python36 -m pip install boto3
    #
    sudo python36 -m pip install re
    #
    sudo python36 -m pip install spark-nlp==2.4.5
    

    创建shell脚本之后,将该脚本复制到AWS S3中的一个位置。你还可以根据需要安装其他python包。

    2.我们可以使用AWS控制台、API或python中的boto3库来启动EMR集群。使用Python的好处是,无论何时需要实例化集群或将其添加到工作流中,都可以重用代码。

    下面是实例化EMR集群的python代码。

    import boto3region_name='region_name'def get_security_group_id(group_name, region_name):
        ec2 = boto3.client('ec2', region_name=region_name)
        response = ec2.describe_security_groups(GroupNames=[group_name])
        return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow(
            Name='cluster_name', # 更新值
            ReleaseLabel='emr-5.27.0',
            LogUri='s3_path_for_logs', # 更新值
            Instances={
                'InstanceGroups': [
                    {
                        'Name': "Master nodes",
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'MASTER',
                        'InstanceType': 'm5.2xlarge', # 根据要求进行变更
                        'InstanceCount': 1 #对于主节点高可用性,设置计数大于1
                    },
                    {
                        'Name': "Slave nodes",
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'CORE',
                        'InstanceType': 'm5.2xlarge', # 根据要求进行变更
                        'InstanceCount': 2
                    }
                ],
                'KeepJobFlowAliveWhenNoSteps': True,
                'Ec2KeyName' : 'key_pair_name', # 更新值
                'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
                'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
            },
            BootstrapActions=[    {
                        'Name':'install_dependencies',
                        'ScriptBootstrapAction':{
                                'Args':[],
                                'Path':'path_to_bootstrapaction_on_s3' # 更新值
                                }
                    }],
            Steps = [],
            VisibleToAllUsers=True,
            JobFlowRole='EMR_EC2_DefaultRole',
            ServiceRole='EMR_DefaultRole',
            Applications=[
                { 'Name': 'hadoop' },
                { 'Name': 'spark' },
                { 'Name': 'hive' },
                { 'Name': 'zeppelin' },
                { 'Name': 'presto' }
            ],
            Configurations=[
                # YARN
                {
                    "Classification": "yarn-site", 
                    "Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4",
                                   "yarn.nodemanager.pmem-check-enabled": "false",
                                   "yarn.nodemanager.vmem-check-enabled": "false"}
                },
                
                # HADOOP
                {
                    "Classification": "hadoop-env", 
                    "Configurations": [
                            {
                                "Classification": "export", 
                                "Configurations": [], 
                                "Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
                            }
                        ], 
                    "Properties": {}
                },
                
                # SPARK
                {
                    "Classification": "spark-env", 
                    "Configurations": [
                            {
                                "Classification": "export", 
                                "Configurations": [], 
                                "Properties": {"PYSPARK_PYTHON":"/usr/bin/python3",
                                               "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
                            }
                        ], 
                    "Properties": {}
                },
                {
                    "Classification": "spark",
                    "Properties": {"maximizeResourceAllocation": "true"},
                    "Configurations": []
                 },
                {
                    "Classification": "spark-defaults",
                    "Properties": {
                        "spark.dynamicAllocation.enabled": "true" #default is also true
                    }
                }
            ]
        )
    

    注意:请确保你对用于日志记录和存储引导操作脚本的S3 bucket具有正确的访问权限。

    基于Spark-NLP的BBC文章文本分类

    现在我们已经准备好集群了,让我们使用Spark NLP和Spark MLlib在BBC数据上构建一个简单的文本分类示例。

    1.初始化Spark

    我们将导入所需的库并使用不同的配置参数初始化spark会话。配置值取决于我的本地环境。相应地调整参数。

    # 导入Spark NLP
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
    from pyspark.sql import SparkSession
    from pyspark.ml import Pipeline# 使用Spark NLP启动Spark会话
    #spark = sparknlp.start()spark = SparkSession.builder 
        .appName("BBC Text Categorization")
        .config("spark.driver.memory","8G") change accordingly
        .config("spark.memory.offHeap.enabled",True)
        .config("spark.memory.offHeap.size","8G") 
        .config("spark.driver.maxResultSize", "2G") 
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")
        .config("spark.kryoserializer.buffer.max", "1000M")
        .config("spark.network.timeout","3600s")
        .getOrCreate()
    

    2.加载文本数据

    我们将使用BBC的数据。你可以从这个链接下载数据。下载以下数据后,使用spark代码加载;

    https://www.kaggle.com/yufengdev/bbc-text-categorization?#Get-the-data

    # 文件位置和类型
    file_location = r'path	obc-text.csv'
    file_type = "csv"# CSV
    infer_schema = "true"
    first_row_is_header = "true"
    delimiter = ","df = spark.read.format(file_type) 
      .option("inferSchema", infer_schema) 
      .option("header", first_row_is_header) 
      .option("sep", delimiter) 
      .load(file_location)df.count()
    

    3.将数据集拆分为训练集和测试集

    与python使用scikit learn分割数据不同,Spark Dataframe有一个内置函数randomSplit()来执行相同的操作。

    (trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)
    

    randomSplit()函数需要两个参数viz。权重数组和seed。在我们的例子中,我们将使用70/30分割,其中70%是训练数据,30%是测试数据。

    4.使用Spark NLP的NLP管道

    让我们继续使用Spark NLP构建NLP管道。Spark NLP最大的优点之一是它与Spark MLLib模块本机集成,有助于构建由transformers和estimators组成的综合ML管道。

    这个管道可以包括诸如CountVectorizer或HashingTF和IDF之类的特征提取模块。我们还可以在这个管道中包含一个机器学习模型。

    下面是由具有特征提取和机器学习模型的NLP管道组成的示例;

    from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator# 转换text列为nlp文件
    document_assembler = DocumentAssembler() 
        .setInputCol("text") 
        .setOutputCol("document")#将文档转换为标识数组
    tokenizer = Tokenizer() 
      .setInputCols(["document"]) 
      .setOutputCol("token")
     
    # 清理标识
    normalizer = Normalizer() 
        .setInputCols(["token"]) 
        .setOutputCol("normalized")# 删除停用词
    stopwords_cleaner = StopWordsCleaner()
          .setInputCols("normalized")
          .setOutputCol("cleanTokens")
          .setCaseSensitive(False)
    stemmer = Stemmer() 
        .setInputCols(["cleanTokens"]) 
        .setOutputCol("stem")# 将自定义文档结构转换为标识数组。
    finisher = Finisher() 
        .setInputCols(["stem"]) 
        .setOutputCols(["token_features"]) 
        .setOutputAsArray(True) 
        .setCleanAnnotations(False)# 生成频率
    hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)# 生成逆文档频率
    idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)# 将标签(字符串)转换为整数。
    label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")# 定义一个简单的多项式逻辑回归模型。尝试不同的超参数组合,看看哪个更适合你的数据。你也可以尝试不同的算法来比较分数。
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)# 将索引(整数)转换为相应的类标签
    label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class")# 定义nlp管道
    nlp_pipeline = Pipeline(
        stages=[document_assembler, 
                tokenizer,
                normalizer,
                stopwords_cleaner, 
                stemmer, 
                finisher,
                hashingTF,
                idf,
                label_stringIdx,
                lr,
                label_to_stringIdx])
    

    5.训练模型

    现在我们的NLP管道已经准备好了,让我们根据训练数据训练我们的模型。

    # 在训练数据上拟合管道
    pipeline_model = nlp_pipeline.fit(trainingData)
    

    6.执行预测

    一旦训练完成,我们就可以预测测试数据上的类标签。

    # 对测试数据进行预测
    predictions =  pipeline_model.transform(testData)
    

    7. 评估模型

    对训练后的模型进行评估对于理解模型如何在看不见的数据上运行是非常重要的。我们将看到3个流行的评估指标,准确度、精确度和召回率。

    1. 准确度
    # 导入evaluator
    from pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = %g" % (accuracy))
    print("Test Error = %g " % (1.0 - accuracy))
    

    1. 精确度
    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = %g" % (accuracy))
    print("Test Error = %g " % (1.0 - accuracy))
    

    1. 召回率
    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="weightedRecall")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = %g" % (accuracy))
    print("Test Error = %g " % (1.0 - accuracy))
    

    根据业务用例,你可以决定使用哪个度量来评估模型。

    例如.如果一个机器学习模型被设计用来根据某些参数来检测癌症,那么最好使用召回率,因为公司无法承受假负例(一个患有癌症但模型没有检测到癌症的人),而如果机器学习模型旨在生成用户推荐,公司可以负担得起误报(10条建议中有8条符合用户配置文件),因此可以使用精确度作为评估指标。

    8. 保存管道模型

    在成功地训练、测试和评估模型之后,你可以将模型保存到磁盘,并在不同的Spark应用程序中使用它。要将模型保存到光盘,请使用以下代码;

    pipeline_model.save('/path/to/storage_location')
    

    结论

    Spark NLP提供了大量的注释器和转换器来构建数据预处理管道。Sparl NLP与Spark MLLib无缝集成,使我们能够在分布式环境中构建端到端的自然语言处理项目。

    在本文中,我们研究了如何在AWS EMR上安装Spark NLP并实现了BBC数据的文本分类。我们还研究了Spark MLlib中的不同评估指标,并了解了如何存储模型以供进一步使用。

    希望你喜欢这篇文章。

    原文链接:https://www.analyticsvidhya.com/blog/2020/07/build-text-categorization-model-with-spark-nlp/

    欢迎关注磐创AI博客站:
    http://panchuang.net/

    sklearn机器学习中文官方文档:
    http://sklearn123.com/

    欢迎关注磐创博客资源汇总站:
    http://docs.panchuang.net/

  • 相关阅读:
    C++——STL内存清除
    c++——智能指针学习(unique_ptr)
    linux下将tomcat加入服务
    linux下oracle远程连接的问题
    oracle计算容量的方式
    oracle删除表的方式
    阻塞与非阻塞的区别
    java中queue的使用
    yum源
    VMware Tools 安装
  • 原文地址:https://www.cnblogs.com/panchuangai/p/13356061.html
Copyright © 2011-2022 走看看