zoukankan      html  css  js  c++  java
  • 利用Spark-mllab进行聚类,分类,回归分析的代码实现(python)

          Spark作为一种开源集群计算环境,具有分布式的快速数据处理能力。而Spark中的Mllib定义了各种各样用于机器学习的数据结构以及算法。Python具有Spark的API。需要注意的是,Spark中,所有数据的处理都是基于RDD的。

    首先举一个聚类方面的详细应用例子Kmeans:

       下面代码是一些基本步骤,包括外部数据,RDD预处理,训练模型,预测。

    #coding:utf-8
    from numpy import array
    from math import sqrt
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans, KMeansModel
    
    
    if __name__ == "__main__":
        sc = SparkContext(appName="KMeansExample",master='local')  # SparkContext
    
    
        # 读取并处理数据
        data = sc.textFile("./kmeans_data.txt")
        print data.collect()
        parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
    
        # 训练数据
        print parsedData.collect()
        clusters = KMeans.train(parsedData, k=2, maxIterations=10,
                                runs=10, initializationMode="random")
    
        #求方差之和
        def error(point):
            center = clusters.centers[clusters.predict(point)]
            return sqrt(sum([x**2 for x in (point - center)]))
        WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    
        print("Within Set Sum of Squared Error = " + str(WSSSE))
    
        #聚类结果
        def sort(point):
            return clusters.predict(point)
        clusters_result = parsedData.map(sort)
        # Save and load model
        # $example off$
        print '聚类结果:'
        print clusters_result.collect()
        sc.stop()
    

    可以看到在利用Spark进行机器学习时,我调用了一个外部的开源包numpy,并利用了数组作为数据结构。而在Mllib中其实已经定义了各种用于机器学习的数据结构,下面简单介绍两种在分类和回归分析中可以用到的DS。

    稀疏向量(SparseVector):稀疏向量是指向量元素中有许多值是0的向量。

    其初始化与简单操作如下:  

    # coding:utf-8
    from pyspark.mllib.linalg import *
    v0 = SparseVector(4, [1, 2], [2, 3.0])  # 稀疏向量,第一个参数为维度,第二个参数是非0维度的下标的集合,第三个参数是非0维度的值的集合
    v1 = SparseVector(4,{1: 3, 2: 4}) # 第一个参数是维度,第二个参数是下标和维度组成的字典
    print v0.dot(v1)  # 计算点积
    print v0.size  # 向量维度
    print v0.norm(0)  # 返回维度0的值
    print v0.toArray()  # 转化为array
    print v0.squared_distance(v1)  # 欧式距离
    

      spark中的稀疏向量可以利用list或者dict进行初始化。

    向量标签(Labeled  point):向量标签就是在向量和标签的组合,分类和回归中,标签可以作为分类中的类别,也可以作为回归中的实际值。

    from pyspark.mllib.regression import LabeledPoint
    data = [
      LabeledPoint(1.0, [1.0, 1.0]),
      LabeledPoint(4.0, [1.0, 3.0]),
      LabeledPoint(8.0, [2.0, 3.0]),
      LabeledPoint(10.0, [3.0, 4.0])]
    print data[0].features
    print data[0].label
    

    下面是mllib中用于回归分析的一些基本实现(线性回归,岭回归):

    # coding:UTF-8
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.regression import LinearRegressionWithSGD
    from pyspark.context import SparkContext
    
    # ----------------线性回归--------------
    
    import numpy as np
    sc = SparkContext(master='local',appName='Regression')
    data = [
      LabeledPoint(1.0, [1.0, 1.0]),
      LabeledPoint(2.0, [1.0, 1.4]),
      LabeledPoint(4.0, [2.0, 1.9]),
      LabeledPoint(6.0, [3.0, 4.0])]  # 训练集
    lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, initialWeights=np.array([1.0,1.0]))
    print lrm.predict(np.array([2.0,1.0]))  # 利用训练出的回归模型进行预测
    
    import os, tempfile
    from pyspark.mllib.regression import LinearRegressionModel
    from pyspark.mllib.linalg import SparseVector
    
    path = tempfile.mkdtemp()
    lrm.save(sc, path)  # 将模型保存至外存
    sameModel = LinearRegressionModel.load(sc, path)  # 读取模型
    print sameModel.predict(SparseVector(2, {0: 100.0, 1: 150}))  # 利用稀疏向量作为数据结构,返回单个预测值
    test_set = []
    for i in range(100):
      for j in range(100):
        test_set.append(SparseVector(2, {0: i,1: j}))
    print sameModel.predict(sc.parallelize(test_set)).collect()  # 预测多值,返回一个RDD数据集
    print sameModel.weights  # 返回参数
    
    
    # -----------------岭回归------------------
    
    from pyspark.mllib.regression import RidgeRegressionWithSGD
    data = [
      LabeledPoint(1.0, [1.0, 1.0]),
      LabeledPoint(4.0, [1.0, 3.0]),
      LabeledPoint(8.0, [2.0, 3.0]),
      LabeledPoint(10.0, [3.0, 4.0])]
    train_set = sc.parallelize(data)
    rrm = RidgeRegressionWithSGD.train(train_set, iterations=100, initialWeights=np.array([1.0,1.0]))
    test_set = []
    for i in range(100):
      for j in range(100):
        test_set.append(np.array([i, j]))
    print rrm.predict(sc.parallelize(test_set)).collect()
    print rrm.weights
    

     上述代码只是让大家弄懂一下简单的操作,对于数据的预处理没有在RDD的基础上做。

    下面是一些分类算法的基本实现:

    # coding:utf-8
    from pyspark import SparkContext
    from pyspark.mllib.regression import LabeledPoint
    
    print '-------逻辑回归-------'
    from pyspark.mllib.classification import LogisticRegressionWithSGD
    sc = SparkContext(appName="LRWSGD", master='local')
    dataset = []
    for i in range(100):
        for j in range(100):
            dataset.append([i,j])
    dataset = sc.parallelize(dataset)  # 并行化数据,转化为RDD
    
    data =[LabeledPoint(0.0, [0.0, 100.0]),LabeledPoint(1.0, [100.0, 0.0]),]
    
    lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)  # 第二个参数是迭代次数
    print lrm.predict(dataset).collect()
    
    lrm.clearThreshold()
    print lrm.predict([0.0, 1.0])
    # ----------------------------------------------------------
    from pyspark.mllib.linalg import SparseVector
    from numpy import array
    sparse_data = [
        LabeledPoint(0.0, SparseVector(2, {0: 0.0, 1: 0.0})),
        LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
        LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
        LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
    ]
    train = sc.parallelize(sparse_data)
    lrm = LogisticRegressionWithSGD.train(train, iterations=10)
    print lrm.predict(array([0.0, 1.0]))  # 对单个数组进行预测
    print lrm.predict(SparseVector(2, {1: 1.0}))  # 对单个稀疏向量进行预测
    
    print '------svm-------'
    
    from pyspark.mllib.classification import SVMWithSGD
    svm = SVMWithSGD.train(train,iterations=10)
    print svm.predict(SparseVector(2, {1: 1.0}))
    
    print '------bayes------'
    from pyspark.mllib.classification import NaiveBayes
    nb = NaiveBayes.train(train)
    print nb.predict(SparseVector(2, {1: 1.0}))

    版权都是我所有的,(*^__^*) 哈哈哈~

  • 相关阅读:
    BGP的属性与配置
    IS-IS协议的简单设置
    ospf中建立虚链路、ospf与rip的重分发 stup与nssa区域的建立
    静态路由 默认路由 浮动路由配置
    centos7防火墙机制iptables与ebtables差别
    centos7虚拟机qemu学习
    centos7安装vncserver(windows控制其图形化界面)
    centos7虚拟机扩容
    centos7安装graylog
    centos7修改网卡
  • 原文地址:https://www.cnblogs.com/adienhsuan/p/5654481.html
Copyright © 2011-2022 走看看