zoukankan      html  css  js  c++  java
  • python操作Spark常用命令

    1. 获取SparkSession

    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

    2. 获取SparkContext  

    1. 获取sparkSession:  se = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
    1. 获取sparkContext: sc = se.sparkContext
    2. 获取sqlContext: sq = SparkSession.builder.getOrCreate()
    3. 获取DataFrame: df = sqlContext.createDataFrame(userRows)

    3. 读取文件

    line1 = sc.textFile("hdfs://192.168.88.128:9000/hello.txt")
    rawData = sc.textFile("hdfs://192.168.88.128:9000/data/sanxi/sanxi/*.gz")   获取sanxi文件夹下所有.gz的文件
    rawData = sc.textFile("file:///data/sanxi2/*.gz") spark 读取本地文件

    4. filter 使用方法

    1. 过滤包含指定字符的RDD
      line2 = line1.filter(lambda x : "a" in x)
    2. 接收一个函数, 将满足该函数的元素放入新的RDD中
       def hasHWTC1AC5C088(line):
        return "HWTC1AC5C088" in line
       lines2 = lines.filter(hasHWTC1AC5C088("HWTC1AC5C088"))  #将函数传入filter中
    3. RDD 删除第一条数据
       header = abc.first()
       df1 = abc.filter(lambda x:x != header)

    5. map 和 flatMap 使用方法   将 lambda 函数做用在每一条记录上

    1)line2 = line1.map(lambda x: x.split(" "))

     2)line3 = line1.map(lambda x: x+"abc")  #对原数据进行任意操作,  将结果再放回给原数据

      3)line4 = line1.map(lambda x: (x, 1))  将原始数据改为 key-value形式,  key为原数据,  value为 1

     4)line2.flatMap(lambda line: line.split(" "))  #  

     5)map 与 flatMap 的区别(通常用来统计单词个数示例,  必须使用flatMap来进行拆分单词)

       map 具有分层,   就是每一行数据作为你一层来处理  ,  结果为: 

        [[u'extends', u'Object'], [u'implements', u'scala.Serializable']]   

       flatMap 不具有分层,   

        [u'extends', u'Object', u'implements', u'scala.Serializable']

      6)map 获取前3列数据   下例中:  [:3]  表示从开头到第三个数据项,   如果是[3:]  就表示从第三项到最后

        Rdd.map(lambda x: x.split(" ")[:3]) 结果:[[u'a', u'1', u'3'], [u'b', u'2', u'4'], [u'd', u'3', u'4']]

      ALS 训练数据---获取指定列数据

        ratingsRdd = rawRatings.map(lambda x:(x[0],x[1],x[2])  结果为:

          [(u'196', u'242', u'3'), (u'186', u'302', u'3'), (u'22', u'377', u'1')]

      7) 类型转换

        Rdd.map(lambda x: float(x[0]))    将第一个字段转换为 float 类型

      8) 删除所有的 "" 号  replace(替换),   下列意思是将" 替换成空

        df2 = df1.map(lambda x:x.replace(""",""))

      9) df2 = RDD.map(lambda x: (x[0],float(x[1]),float(x[2])))   设置一个 key 对应 多个value,  

        df3 = df2.filter(lambda keyValue: keyValue[0] > 2)    操作key

         df3 = df2.filter(lambda keyValue: keyValue[1] > 2)   操作第一个value

         df3 = df2.filter(lambda keyValue: keyValue[2] > 2)    操作第二个value

     6. RDD 类型数据 的查询方式

    print(abc)    打印当前对象
    type(Rdd) 获取当前对象类型
    RDD.collect() 将RDD转换为数组, 结果格式为:([u'{"name":"Michael"}', u'{"name":"Andy", "age":30}', u'{"name":"Justin", "age":19}'])
    RDD.count() 查看内容条数
    Rdd.printSchema() 查看rdd 列

    7. RDD转换操作  rdd转list

            list = RDD.collect()
    2) list转RDD
            RDD = sc.parallelize(list)
    3) RDD 调用 map 函数
      (1) RDD1 = RDD2.map(lambda x: x+1) #使用匿名函数操作每条数据 map(lambda x: x.split(","))字符串截取,
    map(lambda x: "abc"+x) 重组字符串,

      (2) RDD2 = RDD1.map(addOne) #使用具名函数来操作每条数据(具名函数就是单独定义一个函数来处理数据) 如下:
          def addOne(x):
          return x.split(",")
          print(lines.map(addOne).collect()) #调用具名函数
    4. RDD 调用 filter 函数
      1) intRdd
    .filter(lambda x: x>5) #对数字类型的 RDD 进行筛选 intRdd.filter(lambda x: x>5 and x <40) and 表示 并且 的意思, or 表示 或 的意思
      2) stringRdd.filter(lambda x: "abc" in x) #筛选包含 abc 的数据
    4. RDD 删除 重复 元素
      1) intRdd.distinct() #去重
    5. 随机将一个 RDD 通过指定比例 分为 2 个RDD
      1) sRdd = stringRdd.randomSplit([0.4,0.6]) 将 stringRdd 以4:6 分为2个 RDD, 获取其中一个 RDD 的方法为: sRdd[0]
    6. RDD 中 groupBy 分组计算
      1) gRdd = intRdd.groupBy(lambda x: x<2) #将会分为2组, 访问第一粗: print(sorted(gRdd[0][1])), 方位第二组:
    print(sorted(gRdd[1][1]))
      2) 分组并且取别名: gRdd = intRdd.groupBy(lambda x: "a" if(x < 2) else "b"),
        (1)获取第一组信息: print(gRdd[0][0], sorted(gRdd[0][1]))
        (2)
    获取第二组信息: print(gRdd[1][0], sorted(gRdd[1][1])) 其中, 前半部分 gRdd[1][0] 表示获取别名 a

    7. 使用 union 进行并集运算, intersection 进行并集运算
      1)intRdd1.union(intRdd2) 如: intRdd1 为 1, 3, 1 intRdd2 为 1, 2, 3, 4 则结果为: 1,3,1,1,2,3,4
      2)intRdd1.intersection(intRdd2) 计算 2 个RDD 的交集
      3)intRdd3.subtract(intRdd1) 计算 2 个 Rdd 的差集, 此例表示 intRdd3中有, 但在intRdd1中没有
      4)intRdd1.cartesian(intRdd2) 计算 笛卡尔积




     

     8.  RDD 动作运算

    [1] 读取元素  
      1
    ) first() 查看RDD 第一条数据
      2) take(2) 获取第二条数据
      3) takeOrdered(3) 从小到大排序取出前 3 条数据
      4) intRdd3.takeOrdered(6,key=lambda x: -x) 从大道小排序, 取出前6条数据
    [2] 统计功能
      1) intRdd1.stats() 统计 intRdd1, 结果为:(count: 5, mean: 5.0, stdev: 2.82842712475, max: 9, min: 1)
         mean表示平均值,  stdev 表示标准差
      2)intRdd3.min() 最新值,
      3)intRdd3.max() 最大值
      4)intRdd3.stdev() 标准差
      5)intRdd3.count() 数据条数
      6)intRdd3.sum() 求和
      7)intRdd3.mean() 平均值

    9.  RDD key-value 基本转换运算

    1)kvRdd1 = sc.parallelize([(1, 4),(2, 5),(3, 6),(4, 7)])  创建RDD key-value 源数据
      结果为: [(1, 4), (2, 5), (3, 6), (4, 7)]
    2)kvRdd1.keys() 获取全部 key 的值
    3)kvRdd1.values() 获取全部 values 的值
    4)kvRdd1.filter(lambda keyValue: keyValue[0] > 2) 过滤 key > 2 的数据
    5)kvRdd1.filter(lambda keyValue: keyValue[1] >5) 过滤 value > 5 的数据
    6)kvRdd1.mapValues(lambda x: x*x) 对每一条 value 进行运算
    7)kvRdd1.sortByKey() 按照 key 从小到大 进行排序
    8)kvRdd1.sortByKey(ascending=False) 按照 key 从大到小进行排序
    9)kvRdd3.reduceByKey(lambda x, y:x+y) 将 key 相同的键的值合并相加

     10.  多个 RDD key-value 的转换运算

    1) join
      intK1 = sc.parallelize([(1,5),(2,6),(3,7),(4,8),(5,9)])   intK2 = sc.parallelize([(3,30),(2,20),(6,60)])   intK1.join(intK2) join结果为:   [(2, (6, 20)), (3, (7, 30))]
    2)leftJoin
      intK1.leftOuterJoin(intK2).collect() leftJoin结果为:
      [(2, (6, 20)), (4, (8, None)), (1, (5, None)), (3, (7, 30)), (5, (9, None))]
    3)rightJoin    rightJoin 结果为:
      intK1.rithtOuterJoin(intK2).collect()
      [(2, (6, 20)), (6, (None, 60)), (3, (7, 30))]
    4)subtractByKey 从 intK1 中删除 与 intK2 相同 key-value
      intK1.subtractByKey(intK2) 结果为:
      [(4, 8), (1, 5), (5, 9)]  

    11. key-value 动作 运算

    1) intK1.first()    获取第一项数据
    2) intK1.collect() 获取所有项数据
    3) intK1.take(2) 获取前二项数据
    4) intK1.first()[0] 获取第一项数据的 key
    5)
    intK1.first()[1] 获取第一项数据的 value
      例如: 一条记录结果为 [(2, (6, 20)), (4, (8, None)), (1, (5, None)), (3, (7, 30)), (5, (9, None))](leftJoin结果)
      想要获取第一条记录的 6 , 可以使用: intK1.leftOuterJoin(intK2).first()[1][0] [1] 表示获取第一条记录的value[0] 表示
      从 value 中再获取第一项值 6
    6) intK3.countByKey() 计算 RDD 中每一个 Key 值得项数, 例如
      [(1, 2), (2, 3), (2, 5), (2, 8), (5, 10)] 源数据
      defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}) 结果值
    7) KV = intK3.collectAsMap() 将 key-value 转换为 key-value的字典
      {1: 2, 2: 8, 5: 10} 结果为
      例如, 如果要获取 8 这个value, 就使用 KV[2] 就可以获取得到
    8) intK3.lookup(2) 查找 key 为 2 的所有value 值, 如果想要再进行统计计算, 就将结果再进行转换为 RDD 进行统计计算
    9) 广播变量
      1> kvFrult = sc.parallelize([(1, "apple"),(2, "orange"),(3, "grape")]) 创建key-value 对照表
      2> fruitMap = kvFrult.collectAsMap() 转换为 map 字典
      3> bcFruitMap = sc.broadcast(fruitMap) 创建广播变量
      4> fruitIds = sc.parallelize([2,4,1,3]) 创建编号 RDD
      5> fruitNames = fruitIds.map(lambda x: bcFruitMap.value[x]) 使用
    bcFruitMap.value 进行转换 从而获取编号对应的名称
    10) 通过累加器来计算总和
      intRdd = sc.parallelize([1,2,44,2,11,22]) 源数据
      total = sc.accumulator(0.0)  定义一个double类型的累加器, 来计算总和
      num = sc.accumulator(0)  定义一个int类型的累加器, 来计算数量
      intRdd.foreach(lambda l: [total.add(l), num.add(1)])  通过foreach 循环来统计
      total.value 获取总和
      num.value 获取个数
      avg = total.vaue/num.value 获取平均值
    11) RDD 持久化

      1.书221 页面, 设置持久化等级列表
      2.intRdd1.persist()  设置持久化
      2.intRdd1.persist(StorageLevel.MEMORY_AND_DISK)  设置存储等级
      4.intRdd1.is_cached  查看是否持久化
    12) RDD.saveAsTextFile("hdfs://192.168.88.128:9000/data/result.txt") 将结果保存成文件



      

     12 数据格式

    1. [[u'3', u'5'], [u'4', u'6'], [u'4', u'5'], [u'4', u'2']]   拆分或截取的原始数据,  可以通过 map 中的 x[0], x[1] 来获取对应列的数据
      可以通过 map 来转换为key-value 数据格式 例如: df3 = df2.map(lambda x: (x[0], x[1]))

    2. key-value 数据格式
      [(u'3', u'5'), (u'4', u'6'), (u'4', u'5'), (u'4', u'2')] 中每一个() 表示一组数据, 第一个表示key 第二个表示value
    3)PipelinedRDD 类型表示 key-value形式数据

     13 RDD类型转换

    userRdd = sc.textFile("D:datapeople.json")
        userRdd = userRdd.map(lambda x: x.split(" "))
        
        userRows = userRdd.map(lambda p:
                                Row(
                                    userName = p[0],
                                    userAge = int(p[1]),
                                    userAdd = p[2],
                                    userSalary = int(p[3])
                                    )
                                )
        print(userRows.take(4))

    结果: [Row(userAdd='shanghai', userAge=20, userName='zhangsan', userSalary=13), Row(userAdd='beijin', userAge=30, userName='lisi', userSalary=15)]

    2) 创建 DataFrame
      userDF = sqlContext.createDataFrame(userRows)

    14.  通过sql 语句查询字段

    from pyspark.conf import SparkConf
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import Row
    
    
    if __name__ == '__main__':
        spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
            
        sc = spark.sparkContext
        
        rd = sc.textFile("D:datapeople.txt")
        rd2 = rd.map(lambda x:x.split(","))
        people = rd2.map(lambda p: Row(name=p[0], age=int(p[1])))
        
        peopleDF = spark.createDataFrame(people)
        peopleDF.createOrReplaceTempView("people")
        teenagers = spark.sql("SELECT name,age FROM people where name='Andy'")
        teenagers.show(5)
    #     print(teenagers.rdd.collect())
        teenNames = teenagers.rdd.map(lambda p: 100 + p.age).collect()
        
        
        for name in teenNames:
            print(name)
            
        
    View Code

    15 dateFrame,sql,json使用详细示例

    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    """
    A simple example demonstrating basic Spark SQL features.
    Run with:
      ./bin/spark-submit examples/src/main/python/sql/basic.py
    """
    from __future__ import print_function
    
    # $example on:init_session$
    from pyspark.sql import SparkSession
    # $example off:init_session$
    
    # $example on:schema_inferring$
    from pyspark.sql import Row
    # $example off:schema_inferring$
    
    
    # $example on:programmatic_schema$
    # Import data types
    from pyspark.sql.types import *
    # $example off:programmatic_schema$
    
    
    def basic_df_example(spark):
        # $example on:create_df$
        # spark is an existing SparkSession
        df = spark.read.json("/data/people.json")
        # Displays the content of the DataFrame to stdout
        df.show()
        # +----+-------+
        # | age|   name|
        # +----+-------+
        # |null|Michael|
        # |  30|   Andy|
        # |  19| Justin|
        # +----+-------+
        # $example off:create_df$
    
        # $example on:untyped_ops$
        # spark, df are from the previous example
        # Print the schema in a tree format
        df.printSchema()
        # root
        # |-- age: long (nullable = true)
        # |-- name: string (nullable = true)
    
        # Select only the "name" column
        df.select("name").show()
        # +-------+
        # |   name|
        # +-------+
        # |Michael|
        # |   Andy|
        # | Justin|
        # +-------+
    
        # Select everybody, but increment the age by 1
        df.select(df['name'], df['age'] + 1).show()
        # +-------+---------+
        # |   name|(age + 1)|
        # +-------+---------+
        # |Michael|     null|
        # |   Andy|       31|
        # | Justin|       20|
        # +-------+---------+
    
        # Select people older than 21
        df.filter(df['age'] > 21).show()
        # +---+----+
        # |age|name|
        # +---+----+
        # | 30|Andy|
        # +---+----+
    
        # Count people by age
        df.groupBy("age").count().show()
        # +----+-----+
        # | age|count|
        # +----+-----+
        # |  19|    1|
        # |null|    1|
        # |  30|    1|
        # +----+-----+
        # $example off:untyped_ops$
    
        # $example on:run_sql$
        # Register the DataFrame as a SQL temporary view
        df.createOrReplaceTempView("people")
    
        sqlDF = spark.sql("SELECT * FROM people")
        sqlDF.show()
        # +----+-------+
        # | age|   name|
        # +----+-------+
        # |null|Michael|
        # |  30|   Andy|
        # |  19| Justin|
        # +----+-------+
        # $example off:run_sql$
    
        # $example on:global_temp_view$
        # Register the DataFrame as a global temporary view
        df.createGlobalTempView("people")
    
        # Global temporary view is tied to a system preserved database `global_temp`
        spark.sql("SELECT * FROM global_temp.people").show()
        # +----+-------+
        # | age|   name|
        # +----+-------+
        # |null|Michael|
        # |  30|   Andy|
        # |  19| Justin|
        # +----+-------+
    
        # Global temporary view is cross-session
        spark.newSession().sql("SELECT * FROM global_temp.people").show()
        # +----+-------+
        # | age|   name|
        # +----+-------+
        # |null|Michael|
        # |  30|   Andy|
        # |  19| Justin|
        # +----+-------+
        # $example off:global_temp_view$
    
    
    def schema_inference_example(spark):
        # $example on:schema_inferring$
        sc = spark.sparkContext
    
        # Load a text file and convert each line to a Row.
        lines = sc.textFile("examples/src/main/resources/people.txt")
        parts = lines.map(lambda l: l.split(","))
        people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
    
        # Infer the schema, and register the DataFrame as a table.
        schemaPeople = spark.createDataFrame(people)
        schemaPeople.createOrReplaceTempView("people")
    
        # SQL can be run over DataFrames that have been registered as a table.
        teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    
        # The results of SQL queries are Dataframe objects.
        # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
        teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
        for name in teenNames:
            print(name)
        # Name: Justin
        # $example off:schema_inferring$
    
    
    def programmatic_schema_example(spark):
        # $example on:programmatic_schema$
        sc = spark.sparkContext
    
        # Load a text file and convert each line to a Row.
        lines = sc.textFile("examples/src/main/resources/people.txt")
        parts = lines.map(lambda l: l.split(","))
        # Each line is converted to a tuple.
        people = parts.map(lambda p: (p[0], p[1].strip()))
    
        # The schema is encoded in a string.
        schemaString = "name age"
    
        fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
        schema = StructType(fields)
    
        # Apply the schema to the RDD.
        schemaPeople = spark.createDataFrame(people, schema)
    
        # Creates a temporary view using the DataFrame
        schemaPeople.createOrReplaceTempView("people")
    
        # SQL can be run over DataFrames that have been registered as a table.
        results = spark.sql("SELECT name FROM people")
    
        results.show()
        # +-------+
        # |   name|
        # +-------+
        # |Michael|
        # |   Andy|
        # | Justin|
        # +-------+
        # $example off:programmatic_schema$
    
    if __name__ == "__main__":
        # $example on:init_session$
        spark = SparkSession 
            .builder 
            .appName("Python Spark SQL basic example") 
            .config("spark.some.config.option", "some-value") 
            .getOrCreate()
        # $example off:init_session$
    
        basic_df_example(spark)
        # schema_inference_example(spark)
        # programmatic_schema_example(spark)
    
        spark.stop()
    View Code
  • 相关阅读:
    批量新增百万条数据 十百万条数据
    sqlserver 组内排序
    EF ++属性会更新实体
    Entity Framework Core: A second operation started on this context before a previous operation completed
    abp Cannot access a disposed object. A common cause of this error is disposing
    abp xunit Can not register IHostingEnvironment. It should be a non-abstract class. If not, it should be registered before.”
    hangfire enqueued but not processing(hangfire 定时任务入队列但不执行)
    EF 更新实体 The instance of entity type 'BabyEvent' cannot be tracked because another instance
    datatable to entiy list 不支持可空类型和枚举类型
    webapi 设置不显示接口到swaggerUI
  • 原文地址:https://www.cnblogs.com/redhat0019/p/8665491.html
Copyright © 2011-2022 走看看