zoukankan      html  css  js  c++  java
  • 利用Spark-mllab进行聚类,分类,回归分析的代码实现(python)

          Spark作为一种开源集群计算环境,具有分布式的快速数据处理能力。而Spark中的Mllib定义了各种各样用于机器学习的数据结构以及算法。Python具有Spark的API。需要注意的是,Spark中,所有数据的处理都是基于RDD的。

    首先举一个聚类方面的详细应用例子Kmeans:

       下面代码是一些基本步骤,包括外部数据,RDD预处理,训练模型,预测。

    #coding:utf-8
    from numpy import array
    from math import sqrt
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans, KMeansModel
    
    
    if __name__ == "__main__":
        sc = SparkContext(appName="KMeansExample",master='local')  # SparkContext
    
    
        # 读取并处理数据
        data = sc.textFile("./kmeans_data.txt")
        print data.collect()
        parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
    
        # 训练数据
        print parsedData.collect()
        clusters = KMeans.train(parsedData, k=2, maxIterations=10,
                                runs=10, initializationMode="random")
    
        #求方差之和
        def error(point):
            center = clusters.centers[clusters.predict(point)]
            return sqrt(sum([x**2 for x in (point - center)]))
        WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    
        print("Within Set Sum of Squared Error = " + str(WSSSE))
    
        #聚类结果
        def sort(point):
            return clusters.predict(point)
        clusters_result = parsedData.map(sort)
        # Save and load model
        # $example off$
        print '聚类结果:'
        print clusters_result.collect()
        sc.stop()
    

    可以看到在利用Spark进行机器学习时,我调用了一个外部的开源包numpy,并利用了数组作为数据结构。而在Mllib中其实已经定义了各种用于机器学习的数据结构,下面简单介绍两种在分类和回归分析中可以用到的DS。

    稀疏向量(SparseVector):稀疏向量是指向量元素中有许多值是0的向量。

    其初始化与简单操作如下:  

    # coding:utf-8
    from pyspark.mllib.linalg import *
    v0 = SparseVector(4, [1, 2], [2, 3.0])  # 稀疏向量,第一个参数为维度,第二个参数是非0维度的下标的集合,第三个参数是非0维度的值的集合
    v1 = SparseVector(4,{1: 3, 2: 4}) # 第一个参数是维度,第二个参数是下标和维度组成的字典
    print v0.dot(v1)  # 计算点积
    print v0.size  # 向量维度
    print v0.norm(0)  # 返回维度0的值
    print v0.toArray()  # 转化为array
    print v0.squared_distance(v1)  # 欧式距离
    

      spark中的稀疏向量可以利用list或者dict进行初始化。

    向量标签(Labeled  point):向量标签就是在向量和标签的组合,分类和回归中,标签可以作为分类中的类别,也可以作为回归中的实际值。

    from pyspark.mllib.regression import LabeledPoint
    data = [
      LabeledPoint(1.0, [1.0, 1.0]),
      LabeledPoint(4.0, [1.0, 3.0]),
      LabeledPoint(8.0, [2.0, 3.0]),
      LabeledPoint(10.0, [3.0, 4.0])]
    print data[0].features
    print data[0].label
    

    下面是mllib中用于回归分析的一些基本实现(线性回归,岭回归):

    # coding:UTF-8
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.regression import LinearRegressionWithSGD
    from pyspark.context import SparkContext
    
    # ----------------线性回归--------------
    
    import numpy as np
    sc = SparkContext(master='local',appName='Regression')
    data = [
      LabeledPoint(1.0, [1.0, 1.0]),
      LabeledPoint(2.0, [1.0, 1.4]),
      LabeledPoint(4.0, [2.0, 1.9]),
      LabeledPoint(6.0, [3.0, 4.0])]  # 训练集
    lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, initialWeights=np.array([1.0,1.0]))
    print lrm.predict(np.array([2.0,1.0]))  # 利用训练出的回归模型进行预测
    
    import os, tempfile
    from pyspark.mllib.regression import LinearRegressionModel
    from pyspark.mllib.linalg import SparseVector
    
    path = tempfile.mkdtemp()
    lrm.save(sc, path)  # 将模型保存至外存
    sameModel = LinearRegressionModel.load(sc, path)  # 读取模型
    print sameModel.predict(SparseVector(2, {0: 100.0, 1: 150}))  # 利用稀疏向量作为数据结构,返回单个预测值
    test_set = []
    for i in range(100):
      for j in range(100):
        test_set.append(SparseVector(2, {0: i,1: j}))
    print sameModel.predict(sc.parallelize(test_set)).collect()  # 预测多值,返回一个RDD数据集
    print sameModel.weights  # 返回参数
    
    
    # -----------------岭回归------------------
    
    from pyspark.mllib.regression import RidgeRegressionWithSGD
    data = [
      LabeledPoint(1.0, [1.0, 1.0]),
      LabeledPoint(4.0, [1.0, 3.0]),
      LabeledPoint(8.0, [2.0, 3.0]),
      LabeledPoint(10.0, [3.0, 4.0])]
    train_set = sc.parallelize(data)
    rrm = RidgeRegressionWithSGD.train(train_set, iterations=100, initialWeights=np.array([1.0,1.0]))
    test_set = []
    for i in range(100):
      for j in range(100):
        test_set.append(np.array([i, j]))
    print rrm.predict(sc.parallelize(test_set)).collect()
    print rrm.weights
    

     上述代码只是让大家弄懂一下简单的操作,对于数据的预处理没有在RDD的基础上做。

    下面是一些分类算法的基本实现:

    # coding:utf-8
    from pyspark import SparkContext
    from pyspark.mllib.regression import LabeledPoint
    
    print '-------逻辑回归-------'
    from pyspark.mllib.classification import LogisticRegressionWithSGD
    sc = SparkContext(appName="LRWSGD", master='local')
    dataset = []
    for i in range(100):
        for j in range(100):
            dataset.append([i,j])
    dataset = sc.parallelize(dataset)  # 并行化数据,转化为RDD
    
    data =[LabeledPoint(0.0, [0.0, 100.0]),LabeledPoint(1.0, [100.0, 0.0]),]
    
    lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)  # 第二个参数是迭代次数
    print lrm.predict(dataset).collect()
    
    lrm.clearThreshold()
    print lrm.predict([0.0, 1.0])
    # ----------------------------------------------------------
    from pyspark.mllib.linalg import SparseVector
    from numpy import array
    sparse_data = [
        LabeledPoint(0.0, SparseVector(2, {0: 0.0, 1: 0.0})),
        LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
        LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
        LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
    ]
    train = sc.parallelize(sparse_data)
    lrm = LogisticRegressionWithSGD.train(train, iterations=10)
    print lrm.predict(array([0.0, 1.0]))  # 对单个数组进行预测
    print lrm.predict(SparseVector(2, {1: 1.0}))  # 对单个稀疏向量进行预测
    
    print '------svm-------'
    
    from pyspark.mllib.classification import SVMWithSGD
    svm = SVMWithSGD.train(train,iterations=10)
    print svm.predict(SparseVector(2, {1: 1.0}))
    
    print '------bayes------'
    from pyspark.mllib.classification import NaiveBayes
    nb = NaiveBayes.train(train)
    print nb.predict(SparseVector(2, {1: 1.0}))

    版权都是我所有的,(*^__^*) 哈哈哈~

  • 相关阅读:
    DIY 作品 及 维修 不定时更新
    置顶,博客中所有源码 github
    openwrt PandoraBox PBR-M1 极路由4 HC5962 更新固件
    使用 squid 共享 虚拟专用网至局域网
    第一次参加日语能力测试 N5
    libx264 libfdk_aac 编码 解码 详解
    开发RTSP 直播软件 H264 AAC 编码 live555 ffmpeg
    MFC Camera 摄像头预览 拍照
    http2 技术整理 nginx 搭建 http2 wireshark 抓包分析 server push 服务端推送
    plist 图集 php 批量提取 PS 一个个切
  • 原文地址:https://www.cnblogs.com/adienhsuan/p/5654481.html
Copyright © 2011-2022 走看看