zoukankan      html  css  js  c++  java
  • PySpark 入门

    1.wordCount 

     1 from __future__ import print_function
     2 
     3 import sys
     4 from operator import add
     5 
     6 from pyspark import SparkContext
     7 
     8 
     9 if __name__ == "__main__":
    10 if len(sys.argv) != 2:
    11 print("Usage: wordcount <file>", file=sys.stderr)
    12 exit(-1)
    13 sc = SparkContext(appName="PythonWordCount")
    14 lines = sc.textFile(sys.argv[1], 1)
    15 counts = lines.flatMap(lambda x: x.split(' ')) 
    16 .map(lambda x: (x, 1)) 
    17 .reduceByKey(add)
    18 output = counts.collect()
    19 for (word, count) in output:
    20 print("%s: %i" % (word, count))
    21 
    22 sc.stop()

     

    2. Sql.py

    Sql介绍了DataFrame的使用方法

    from __future__ import print_function
    
    import os
    import sys
    
    
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
    
    
    if __name__ == "__main__":
        sc = SparkContext(appName="PythonSQL")
        sqlContext = SQLContext(sc)
    
        # RDD is created from a list of rows
        some_rdd = sc.parallelize([Row(name="John", age=19),
                                  Row(name="Smith", age=23),
                                  Row(name="Sarah", age=18)])
        # Infer schema from the first row, create a DataFrame and print the schema
        some_df = sqlContext.createDataFrame(some_rdd)
        some_df.printSchema()
    
        # Another RDD is created from a list of tuples
        another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
        # Schema with two fields - person_name and person_age
        schema = StructType([StructField("person_name", StringType(), False),
                            StructField("person_age", IntegerType(), False)])
        # Create a DataFrame by applying the schema to the RDD and print the schema
        another_df = sqlContext.createDataFrame(another_rdd, schema)
        another_df.printSchema()
        # root
        #  |-- age: integer (nullable = true)
        #  |-- name: string (nullable = true)
    
        # A JSON dataset is pointed to by path.
    

    3. Sort

    sort实现了排序功能,主要通过sortByKey, 也可以使用SortWith, 注意如果数据量特别大,不要使用collect, 而是应该将rdd repatition为1个分区然后保存在hdfs上使用

    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("Usage: sort <file>", file=sys.stderr)
            exit(-1)
        sc = SparkContext(appName="PythonSort")
        lines = sc.textFile(sys.argv[1], 1)
        sortedCount = lines.flatMap(lambda x: x.split(' ')) 
            .map(lambda x: (int(x), 1)) 
            .sortByKey(lambda x: x)
        # This is just a demo on how to bring all the sorted data back to a single node.
        # In reality, we wouldn't want to collect all the data to the driver node.
        output = sortedCount.collect()
        for (num, unitcount) in output:
            print(num)
    
        sc.stop()
    

    4. LR回归

    from __future__ import print_function
    
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    
    
    D = 10  # Number of dimensions
    
    # Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
    # make further computations faster.
    # The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
    # into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
    def readPointBatch(iterator):
        strs = list(iterator)
        matrix = np.zeros((len(strs), D + 1))
        for i, s in enumerate(strs):
            matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
        return [matrix]
    
    if __name__ == "__main__":
    
        if len(sys.argv) != 3:
            print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
            exit(-1)
    
        print("""WARN: This is a naive implementation of Logistic Regression and is
          given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py
          to see how MLlib's implementation is used.""", file=sys.stderr)
        sc = SparkContext(appName="PythonLR")
        points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
        iterations = int(sys.argv[2])
    
        # Initialize w to a random value
        w = 2 * np.random.ranf(size=D) - 1
        print("Initial w: " + str(w))
    
        # Compute logistic regression gradient for a matrix of data points
        def gradient(matrix, w):
            Y = matrix[:, 0]    # point labels (first column of input file)
            X = matrix[:, 1:]   # point coordinates
            # For each point (x, y), compute gradient function, then sum these up
            return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
    
        def add(x, y):
            x += y
            return x
    
        for i in range(iterations):
            print("On iteration %i" % (i + 1))
            w -= points.map(lambda m: gradient(m, w)).reduce(add)
    
        print("Final w: " + str(w))
    
        sc.stop()
    

     

    pyspark递交到yarn上运行

    /home/hadoop/soft/spark/bin/spark-submit

    --master yarn

    --deploy-mode cluster  

    --num-executors 1  

    --executor-memory 1G  

    wordCount.py

  • 相关阅读:
    Apache conf文件配置个人总结
    JavaScript模块化编程之require.js与sea.js
    Huffman树与最优二叉树续
    数据结构之Huffman树与最优二叉树
    浅谈JavaScript中的字符串操作
    浅谈二叉树遍历的栈方法
    浅谈JavaScript中的柯里化函数
    JavaScript之再谈回调与闭包
    浅谈JavaScript中typeof与instanceof的区别
    Golang解压文件,带目录
  • 原文地址:https://www.cnblogs.com/energy1010/p/10161475.html
Copyright © 2011-2022 走看看