1.wordCount
1 from __future__ import print_function 2 3 import sys 4 from operator import add 5 6 from pyspark import SparkContext 7 8 9 if __name__ == "__main__": 10 if len(sys.argv) != 2: 11 print("Usage: wordcount <file>", file=sys.stderr) 12 exit(-1) 13 sc = SparkContext(appName="PythonWordCount") 14 lines = sc.textFile(sys.argv[1], 1) 15 counts = lines.flatMap(lambda x: x.split(' ')) 16 .map(lambda x: (x, 1)) 17 .reduceByKey(add) 18 output = counts.collect() 19 for (word, count) in output: 20 print("%s: %i" % (word, count)) 21 22 sc.stop()
2. Sql.py
Sql介绍了DataFrame的使用方法
from __future__ import print_function import os import sys from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType if __name__ == "__main__": sc = SparkContext(appName="PythonSQL") sqlContext = SQLContext(sc) # RDD is created from a list of rows some_rdd = sc.parallelize([Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]) # Infer schema from the first row, create a DataFrame and print the schema some_df = sqlContext.createDataFrame(some_rdd) some_df.printSchema() # Another RDD is created from a list of tuples another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)]) # Schema with two fields - person_name and person_age schema = StructType([StructField("person_name", StringType(), False), StructField("person_age", IntegerType(), False)]) # Create a DataFrame by applying the schema to the RDD and print the schema another_df = sqlContext.createDataFrame(another_rdd, schema) another_df.printSchema() # root # |-- age: integer (nullable = true) # |-- name: string (nullable = true) # A JSON dataset is pointed to by path.
3. Sort
sort实现了排序功能,主要通过sortByKey, 也可以使用SortWith, 注意如果数据量特别大,不要使用collect, 而是应该将rdd repatition为1个分区然后保存在hdfs上使用
from __future__ import print_function import sys from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: sort <file>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonSort") lines = sc.textFile(sys.argv[1], 1) sortedCount = lines.flatMap(lambda x: x.split(' ')) .map(lambda x: (int(x), 1)) .sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = sortedCount.collect() for (num, unitcount) in output: print(num) sc.stop()
4. LR回归
from __future__ import print_function import sys import numpy as np from pyspark import SparkContext D = 10 # Number of dimensions # Read a batch of points from the input file into a NumPy matrix object. We operate on batches to # make further computations faster. # The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these # into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient(). def readPointBatch(iterator): strs = list(iterator) matrix = np.zeros((len(strs), D + 1)) for i, s in enumerate(strs): matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ') return [matrix] if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: logistic_regression <file> <iterations>", file=sys.stderr) exit(-1) print("""WARN: This is a naive implementation of Logistic Regression and is given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py to see how MLlib's implementation is used.""", file=sys.stderr) sc = SparkContext(appName="PythonLR") points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache() iterations = int(sys.argv[2]) # Initialize w to a random value w = 2 * np.random.ranf(size=D) - 1 print("Initial w: " + str(w)) # Compute logistic regression gradient for a matrix of data points def gradient(matrix, w): Y = matrix[:, 0] # point labels (first column of input file) X = matrix[:, 1:] # point coordinates # For each point (x, y), compute gradient function, then sum these up return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1) def add(x, y): x += y return x for i in range(iterations): print("On iteration %i" % (i + 1)) w -= points.map(lambda m: gradient(m, w)).reduce(add) print("Final w: " + str(w)) sc.stop()
pyspark递交到yarn上运行
/home/hadoop/soft/spark/bin/spark-submit
--master yarn
--deploy-mode cluster
--num-executors 1
--executor-memory 1G
wordCount.py