zoukankan      html  css  js  c++  java
  • 自定义 spark transformer 和 estimator 的范例

    https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types

    要了解有关Spark ML所基于的数据集API的未来的更多信息,请查看Holden Karau和Seth Hendrickson的会话Spark Structured Streaming,以便在2017年3月14日至16日在Strata + Hadoop World San Jose 进行机器学习。是Python更多你的事情?2017年2月9日,请参阅Karau在2017年东部Spark Summit的调试 PySpark的演讲。

    您还可以在Holden Karau和Rachel Warren的“ 高性能Spark:扩展和优化Apache Spark的最佳实践 ”中了解更多信息。

    虽然Spark ML管道具有多种算法,但您可能会发现自己需要其他功能而无需离开管道模型。在Spark MLlib中,这不是什么大问题 - 您可以使用RDD转换手动实现算法并继续从那里开始。对于Spark ML管道,同样的方法可以工作,但是我们失去了一些很好的集成管道属性,包括自动运行元算法的能力,例如交叉验证参数搜索。在本文中,您将学习如何使用标准wordcount示例作为起点扩展Spark ML管道模型(人们永远无法逃避大数据wordcount示例的介绍)。

    要将自己的算法添加到Spark管道,您需要实现Estimator或Transformer实现PipelineStage接口。对于不需要培训的算法,您可以实现Transformer接口,对于经过培训的算法,您可以实现Estimator接口org.apache.spark.ml(两者都实现基础PipelineStage)。请注意,培训不仅限于复杂的机器学习模型; 甚至MinMaxScaler也需要培训来确定范围。如果他们需要培训,他们必须建造Estimator而不是Transformer。

    STRATA数据会议

    2014年3月25日至28日在旧金山举行的Strata数据会议
    最优价格将于1月11日结束
    注意
    PipelineStage直接使用不起作用,因为在管道内使用了适合的反射,假设所有阶段都是a Estimator或a Transformer。
    除了显而易见的transform或fit函数之外,所有管道阶段都需要提供transformSchema,copy构造函数或实现一个类,它为您提供这些 - 用于copy制作当前阶段的副本,任何新指定的参数合并在一起,并且可以简单地调用defaultCopy(除非你的类有特殊的构造函数注意事项)。

    显示了流水线​​阶段的开始以及复制委托 - transformSchema必须根据任何参数集和输入模式生成管道阶段的预期输出。大多数管道阶段只需添加新字段; 如果需要,很少删除以前的字段,但这有时会导致记录包含的数据多于下游所需的数据,从而对性能产生负面影响。如果您发现这是管道中的问题,您可以创建自己的阶段以删除不必要的字段。

    class HardCodedWordCountStage(override val uid: String) extends Transformer {
    def this() = this(Identifiable.randomUID("hardcodedwordcount"))

    def copy(extra: ParamMap): HardCodedWordCountStage = {
    defaultCopy(extra)
    }
    除了生成输出模式之外,该transformSchema函数还应验证输入模式是否适合该阶段(例如,输入列是预期类型)。

    这也是您应该对阶段参数执行验证的地方。

    transformSchema具有硬编码输入和输出列的字符串输入和矢量输出的简单说明如下。

    override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is a string
    val idx = schema.fieldIndex("happy_pandas")
    val field = schema.fields(idx)
    if (field.dataType != StringType) {
    throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    }
    // Add the return field
    schema.add(StructField("happy_panda_counts", IntegerType, false))
    }
    使用该Transformer接口可以非常简单地实现不需要训练的算法。由于这是最简单的流水线阶段,因此您可以从实现一个简单的变换器开始,该变换器计算输入列上的字数。

    def transform(df: Dataset[_]): DataFrame = {
    val wordcount = udf { in: String => in.split(" ").size }
    df.select(col("*"),
    wordcount(df.col("happy_pandas")).as("happy_panda_counts"))
    }
    要充分利用管道接口,您需要使用params接口配置管道阶段。

    O'Reilly数据通讯
    获取O'Reilly数据通讯
    每周接收业内人士的见解 - 以及有关数据主题的独家内容,优惠等。

    你的邮件

    国家

    订阅
    请阅读我们的隐私政策。
    虽然params接口是公共的,但遗憾的是,Spark中常用的常见默认参数是私有的,因此最终会有一些代码重复。除了允许用户指定值之外,参数还可以包含一些基本验证逻辑(例如,正则化参数必须设置为非负值)。两个最常见的参数是输入列和输出列,您可以相对简单地添加到模型中。

    除了字符串参数之外,还可以使用任何其他类型,包括停用词之类的字符串列表,以及停用词之类的字符串。

    class ConfigurableWordCount(override val uid: String) extends Transformer {
    final val inputCol= new Param[String](this, "inputCol", "The input column")
    final val outputCol = new Param[String](this, "outputCol", "The output column")

    ; def setInputCol(value: String): this.type = set(inputCol, value)

    def setOutputCol(value: String): this.type = set(outputCol, value)

    def this() = this(Identifiable.randomUID("configurablewordcount"))

    def copy(extra: ParamMap): HardCodedWordCountStage = {
    defaultCopy(extra)
    }

    override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is a string
    val idx = schema.fieldIndex($(inputCol))
    val field = schema.fields(idx)
    if (field.dataType != StringType) {
    throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
    }

    def transform(df: Dataset[_]): DataFrame = {
    val wordcount = udf { in: String => in.split(" ").size }
    df.select(col("*"), wordcount(df.col($(inputCol))).as($(outputCol)))
    }
    }
    需要训练的算法可以使用Estimator接口实现- 尽管对于许多算法,org.apache.spark.ml.Predictor或者org.apache.spark.ml.classificationClassifier辅助类更容易实现。Estimator和Transformer接口之间的主要区别在于,不是直接在输入上表达转换,而是首先以train函数的形式进行训练。字符串索引器是您可以实现的最简单的估算器之一,虽然它已经在Spark中可用,但仍然是如何使用估计器接口的一个很好的例证。

    trait SimpleIndexerParams extends Params {
    final val inputCol= new Param[String](this, "inputCol", "The input column")
    final val outputCol = new Param[String](this, "outputCol", "The output column")
    }

    class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {

    def setInputCol(value: String) = set(inputCol, value)

    def setOutputCol(value: String) = set(outputCol, value)

    def this() = this(Identifiable.randomUID("simpleindexer"))

    override def copy(extra: ParamMap): SimpleIndexer = {
    defaultCopy(extra)
    }

    override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is a string
    val idx = schema.fieldIndex($(inputCol))
    val field = schema.fields(idx)
    if (field.dataType != StringType) {
    throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
    }

    override def fit(dataset: Dataset[]): SimpleIndexerModel = {
    import dataset.sparkSession.implicits.

    val words = dataset.select(dataset($(inputCol)).as[String]).distinct
    .collect()
    new SimpleIndexerModel(uid, words)
    ; }
    }

    class SimpleIndexerModel(
    override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {

    override def copy(extra: ParamMap): SimpleIndexerModel = {
    defaultCopy(extra)
    }

    private val labelToIndex: Map[String, Double] = words.zipWithIndex.
    map{case (x, y) => (x, y.toDouble)}.toMap

    override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is a string
    val idx = schema.fieldIndex($(inputCol))
    val field = schema.fields(idx)
    if (field.dataType != StringType) {
    throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
    }

    override def transform(dataset: Dataset[_]): DataFrame = {
    val indexer = udf { label: String => labelToIndex(label) }
    dataset.select(col("*"),
    indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))
    }
    }
    如果要实现迭代算法,您可能希望在未缓存输入数据时自动缓存输入数据,或者允许用户指定持久性级别。

    O'REILLY在线学习

    学得更快。深入挖掘。再看一点。
    加入O'Reilly的在线学习平台。立即免费试用,即时查找答案,或掌握新的有用的东西。

    学到更多
    所述Predictor接口将两个最常用的参数(输入和输出列)作为标签栏,设有柱,和预测列和自动处理的模式转换为我们。

    该Classifier界面做许多事一样,但它也增加了rawPredictionColumn,并提供工具来检测(班数getNumClasses),以及输入转换DataFrame到一个RDD LabeledPoints(使它更容易包装旧的MLlib分类算法)。

    如果要实现回归或群集接口,则不需要使用公共基本接口集,因此您需要使用通用Estimator接口。

    // Simple Bernouli Naive Bayes classifier - no sanity checks for brevity
    // Example only - not for production use.
    class SimpleNaiveBayes(val uid: String)
    extends Classifier[Vector,SimpleNaiveBayes,SimpleNaiveBayesModel] {

    def this() = this(Identifiable.randomUID("simple-naive-bayes"))

    override def train(ds: Dataset[]): SimpleNaiveBayesModel = {
    import ds.sparkSession.implicits.

    ds.cache()
    // Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead
    // Using the RDD approach is common when integrating with legacy machine learning code
    // or iterative algorithms which can create large query plans.
    // Here we use Datasets since neither of those apply.

    // Compute the number of documents
    val numDocs = ds.count
    // Get the number of classes.
    // Note this estimator assumes they start at 0 and go to numClasses
    val numClasses = getNumClasses(ds)
    // Get the number of features by peaking at the first row
    val numFeatures: Integer = ds.select(col($(featuresCol))).head
      .get(0).asInstanceOf[Vector].size
    // Determine the number of records for each class
    val groupedByLabel = ds.select(col($(labelCol)).as[Double]).groupByKey(x => x)
    val classCounts = groupedByLabel.agg(count("*").as[Long])
      .sort(col("value")).collect().toMap
    // Select the labels and features so we can more easily map over them.
    // Note: we do this as a DataFrame using the untyped API because the Vector
    // UDT is no longer public.
    val df = ds.select(col($(labelCol)).cast(DoubleType), col($(featuresCol)))
    // Figure out the non-zero frequency of each feature for each label and
    // output label index pairs using a case clas to make it easier to work with.
    val labelCounts: Dataset[LabeledToken] = df.flatMap {
      case Row(label: Double, features: Vector) =>
        features.toArray.zip(Stream from 1)
          .filter{vIdx => vIdx._2 == 1.0}
          .map{case (v, idx) => LabeledToken(label, idx)}
    }
    // Use the typed Dataset aggregation API to count the number of non-zero
    // features for each label-feature index.
    val aggregatedCounts: Array[((Double,Integer),Long)] = labelCounts
      .groupByKey(x => (x.label, x.index))
      .agg(count("*").as[Long]).collect()
    
    val theta = Array.fill(numClasses)(new Array[Double](numFeatures))
    
    // Compute the denominator for the general prioirs
    val piLogDenom = math.log(numDocs + numClasses)
    // Compute the priors for each class
    val pi = classCounts.map{case(_, cc) =>
      math.log(cc.toDouble) - piLogDenom }.toArray
    
    // For each label/feature update the probabilities
    aggregatedCounts.foreach{case ((label, featureIndex), count) =>
      // log of number of documents for this label + 2.0 (smoothing)
      val thetaLogDenom = math.log(
        classCounts.get(label).map(_.toDouble).getOrElse(0.0) + 2.0)
      theta(label.toInt)(featureIndex) = math.log(count + 1.0) - thetaLogDenom
    }
    // Unpersist now that we are done computing everything
    ds.unpersist()
    // Construct a model
    new SimpleNaiveBayesModel(uid, numClasses, numFeatures, Vectors.dense(pi),
      new DenseMatrix(numClasses, theta(0).length, theta.flatten, true))
    

    }

    override def copy(extra: ParamMap) = {
    defaultCopy(extra)
    }
    }

    // Simplified Naive Bayes Model
    case class SimpleNaiveBayesModel(
    override val uid: String,
    override val numClasses: Int,
    override val numFeatures: Int,
    val pi: Vector,
    val theta: DenseMatrix) extends
    ClassificationModel[Vector,SimpleNaiveBayesModel] {

    override def copy(extra: ParamMap) = {
    defaultCopy(extra)
    }

    // We have to do some tricks here because we are using Spark's
    // Vector/DenseMatrix calculations - but for your own model don't feel
    // limited to Spark's native ones.
    val negThetaArray = theta.values.map(v => math.log(1.0 - math.exp(v)))
    val negTheta = new DenseMatrix(numClasses, numFeatures, negThetaArray, true)
    val thetaMinusNegThetaArray = theta.values.zip(negThetaArray)
    .map{case (v, nv) => v - nv}
    val thetaMinusNegTheta = new DenseMatrix(
    numClasses, numFeatures, thetaMinusNegThetaArray, true)
    val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))
    val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray

    // Here is the prediciton functionality you need to implement - for ClassificationModels
    // transform automatically wraps this - but if you might benefit from broadcasting your model or
    // other optimizations you can also override transform.
    def predictRaw(features: Vector): Vector = {
    // Toy implementation - use BLAS or similar instead
    // the summing of the three vectors but the functionality isn't exposed.
    Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray)
    .map{case (x, y) => x + y}.zip(negThetaSum).map{case (x, y) => x + y}
    )
    }
    }

  • 相关阅读:
    Visual Studio 2008 每日提示(四)
    修改XP注册到用户名和公司组织名
    Visual Studio技巧之打造拥有自己标识的代码模板
    收集的学习资料
    多个记录更新(存储过程)
    '1,2,3,68,10'转换为'1,2,3,6,7,8,10'
    .NET程序员面试的题一部 (转)
    [.net]DataGrid中绑定DropDownList[转]
    使用DELETE与TRUNCATE删除表所有行的区别
    sysobjects 各列的含义
  • 原文地址:https://www.cnblogs.com/wdmx/p/9981002.html
Copyright © 2011-2022 走看看