zoukankan      html  css  js  c++  java
  • Spark ML机器学习

    Spark提供了常用机器学习算法的实现, 封装于spark.mlspark.mllib中.

    spark.mllib是基于RDD的机器学习库, spark.ml是基于DataFrame的机器学习库.

    相对于RDD, DataFrame拥有更丰富的操作API, 可以进行更灵活的操作. 目前, spark.mllib已经进入维护状态, 不再添加新特性.

    本文将重点介绍pyspark.ml, 测试环境为Spark 2.1, Python API.

    首先介绍pyspark.ml中的几个基类:

    • ML DataSet: 即为pyspark.sql.DataFrame作为数据集使用

    • pyspark.ml.Transformer: 代表将数据集转换到另一个数据集的算法

    • pyspark.ml.Estimator: 代表根据数据和参数创建模型的算法,包含方法

      • fit(dataset, params): 根据训练数据集和参数进行训练, 返回训练好的模型对象
    • pyspark.ml.Model: 代表训练好的模型的基类, 通常由Estimator.fit()创建. 包含的方法有:

      • transform(df): 将输入数据集代入模型变换为输出数据集
      • save(path): 保存训练好的模型
      • load(path): 从文件中加载模型
    • pyspark.ml.Pipeline: 用于将多个步骤组合为管道进行处理, 可以建立线性管道和有向无环图管道.

    pyspark.ml下将不同算法封装到不同的包中:

    • pyspark.ml.linalg 线性代数工具包. 包括:
      • Vector
      • DenseVector
      • SparseVector
      • Matrix
      • DenseMatrix
      • SparseMatrix
    • pyspark.ml.feature特征和预处理算法包. 包括:
      • Tokenizer
      • Normalizer
      • StopWordsRemover
      • PCA
      • NGram
      • Word2Vec
    • pyspark.ml.classification分类算法包. 包括:
      • LogisticRegression
      • DecisionTreeClassifier
      • RandomForestClassifier
      • NaiveBayes
      • MultilayerPerceptronClassifier
      • OneVsRest
    • pyspark.ml.clustering 聚类算法包. 包括:
      • KMeans
      • LDA
    • pyspark.ml.regression回归算法包. 包括:
      • LinearRegression
      • GeneralizedLinearRegression
      • DecisionTreeRegressor
      • RandomForestRegressor
    • pyspark.ml.recommendation推荐系统算法包. 包括:
      • ALS
    • pyspark.ml.tuning 校验工具包
    • pyspark.ml.evaluation 评估工具包

    pyspark.ml中的算法大多数为Estimator的派生类. 大多数算法类均拥有对应的Model类.

    classification.NaiveBayesclassification.NaiveBayesModel. 算法类的fit方法可以生成对应的Model类.

    应用示例

    pyspark.ml使用了统一风格的接口,这里只展示部分算法.

    首先用NaiveBayes分类器做一个二分类:

    >>> from pyspark.sql import Row
    >>> from pyspark.ml.linalg import Vectors
    >>> df = spark.createDataFrame([
    ...     Row(label=0.0, weight=0.1, features=Vectors.dense([0.0, 0.0])),
    ...     Row(label=0.0, weight=0.5, features=Vectors.dense([0.0, 1.0])),
    ...     Row(label=1.0, weight=1.0, features=Vectors.dense([1.0, 0.0]))])
    >>> nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
    >>> model = nb.fit(df)  # 构造模型
    >>> test0 = sc.parallelize([Row(features=Vectors.dense([1.0, 0.0]))]).toDF()
    >>> result = model.transform(test0).head()  # 预测
    >>> result.prediction
    1.0
    >>> result.probability
    DenseVector([0.32..., 0.67...])
    >>> result.rawPrediction
    DenseVector([-1.72..., -0.99...])
    

    model.transform将输入的一行(Row)作为一个样本,产生一行输出. 这里我们只输入了一个测试样本, 所以直接使用head()取出唯一一行输出.

    使用LogisticRegression和OneVsRest做多分类:

    >>> from pyspark.sql import Row
    >>> from pyspark.ml.linalg import Vectors
    >>> df = sc.parallelize([
    ...     Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
    ...     Row(label=1.0, features=Vectors.sparse(2, [], [])),
    ...     Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
    >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
    >>> ovr = OneVsRest(classifier=lr)
    >>> model = ovr.fit(df)
    >>> # 进行预测
    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
    >>> model.transform(test0).head().prediction
    1.0
    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
    >>> model.transform(test1).head().prediction
    0.0
    >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF()
    >>> model.transform(test2).head().prediction
    2.0
    

    使用PCA进行降维:

    >>> from pyspark.ml.linalg import Vectors
    >>> data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
    ...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
    ...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
    >>> df = spark.createDataFrame(data,["features"])
    >>> pca = PCA(k=2, inputCol="features", outputCol="pca_features")
    >>> model = pca.fit(df)
    >>> model.transform(df).head().pca_features
    DenseVector([1.648..., -4.013...])
    

    EstimatorTransformer均为PipelineStage的派生类,pipeline由一系列Stage组成.调用pipeline对象的fit方法, 将会依次执行Stage并生成一个最终模型.

    >>>from pyspark.ml import Pipeline
    >>>from pyspark.ml.classification import LogisticRegression
    >>>from pyspark.ml.feature import HashingTF, Tokenizer
    >>> data = [
            (0, "a b c d e spark", 1.0),
            (1, "b d", 0.0),
            (2, "spark f g h", 1.0),
            (3, "hadoop mapreduce", 0.0) ]
    >>> df = spark.createDataFrame(data, ["id", "text", "label"])
    >>> # build pipeline
    >>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
    >>> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    >>> lr = LogisticRegression(maxIter=10, regParam=0.001)
    >>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
    >>> # train
    >>> model = pipeline.fit(df)
    >>> data2 = [
           (4, "spark i j k"),
           (5, "l m n"),
           (6, "spark hadoop spark"),
           (7, "apache hadoop")
    ]
    >>> test = spark.createDataFrame(data2, ["id", "text"])
    >>> result = model.transform(test)
    >>> result = result.select("id", "text", "probability", "prediction")
    >>> result.collect()
    [Row(id=4, text=u'spark i j k', probability=DenseVector([0.1596, 0.8404]), prediction=1.0), 
    Row(id=5, text=u'l m n', probability=DenseVector([0.8378, 0.1622]), prediction=0.0), 
    Row(id=6, text=u'spark hadoop spark', probability=DenseVector([0.0693, 0.9307]), prediction=1.0), 
    Row(id=7, text=u'apache hadoop', probability=DenseVector([0.9822, 0.0178]), prediction=0.0)]
    

    本文示例来源于官方文档


    更多内容请参考:

  • 相关阅读:
    异常处理 try catch throw(C++)
    Kubernetes轻量级日志收集系统LokiStack
    第一章.java
    第四章.选择结构(二)
    java语法
    第三章if选择结构
    第二章.数据类型变量名和运算符
    【转载】跳槽七诫
    【转载】修改shell终端提示信息
    ubuntu11.10面板上输入法图标消失解决办法
  • 原文地址:https://www.cnblogs.com/Finley/p/6390530.html
Copyright © 2011-2022 走看看