zoukankan      html  css  js  c++  java
  • 构建机器学习工作流

    #导入相关库
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
     
    #为spark的SparkSession对象
    spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()
    #构建训练数据集
    training=spark.createDataFrame([
    (0,"a b c d e spark",1.0),
    (1,"b d",0.0),
    (2,"spark f g h",1.0),
    (3,"hadoop mapreduce",0.0)],["id","text","label"])
    #定义 Pipeline 中的各个工作流阶段PipelineStage,包括转换器和评估器,具体的,包含tokenizer, hashingTF和lr三个步骤。
    tokenizer = Tokenizer(inputCol="text",outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    lr = LogisticRegression(maxIter =10, regParam=0.001)
    #按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。
    #现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer
    #model的类型是一个PipelineModel
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
    model = pipeline.fit(training)
     
    #先构建测试数据。
    test = spark.createDataFrame([
    (4,"spark i j k"),
    (5," l m n"),
    (6,"spark hadoop spark"),
    (7,"apache hadoop")],["id","text"])
    #调用我们训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的工作流,生成我们所需要的预测结果
    prediction =model.transform(test)
    selected = prediction.select("id","text","probability","prediction")
    for row in selected.collect():
    rid, text, prob, prediction =row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
    效果图
     
  • 相关阅读:
    ASP.NET应用程序与页面生命周期
    阻塞分析
    性能和异常日志
    solr 搜索引擎及搜索推荐应用
    solr 搜索引擎
    分布式缓存地址
    Windows平台分布式架构实践
    职责链模式vs状态模式区别
    HBase
    单例模式
  • 原文地址:https://www.cnblogs.com/SoftwareBuilding/p/9459315.html
Copyright © 2011-2022 走看看