zoukankan      html  css  js  c++  java
  • SparkSQL

    Spark SQL 增加了DataFrame 即带有Schema信息的RDD

     DataFrame 创建

    启动pyspark(由于内存不够 启动本地,模式)

    pyspark --master local

    pyspark 自动生成 sc,sparksession

    from pyspark import SparkContext,SparkConf
    
    from pyspark.sql import SparkSession
    
    spark=SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    
    df=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
    
    df.show()

    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+

    1。利用反射机制推断RDD

    schemaPeople.createOrReplaceTempView("people")的people 是一个名称

    persionDF是新建的DataFrame 每个sql就是一个DataFrame

    下面从rdd转到df再到rdd

     2。使用编程方法定义RDD

     for循环2次

    名字,数据类型,可否为空

     

    记录放前面,表头放后面

    sparksql读取Mysql

     Mysql数据库的准备:

    http://dblab.xmu.edu.cn/blog/install-mysql/ mysql安装

    1. sudo apt-get update #更新软件源
    2. sudo apt-get install mysql-server #安装mysql
    3. service mysql start

     sudo mysql 启动mysql

    修改用户 hadoop

    select user, host from mysql.user;      看当前的所有用户

    grant all privileges on *.* to 'hadoop'@'localhost' identified by "Unsw2016";  给hadoop权限并设密码

    flush privileges;    刷新权限

    show grants for 'hadoop'@'localhost';     看权限

    sql基本操作:

    https://www.jianshu.com/p/652495b6fea6

    mysql -u hadoop -p
    启动mysql

     

    https://dev.mysql.com/downloads/connector/j/ 

    使用spark sql读取mysql

    通过JDBC链接mysql

    老是他秒的读不出 必须在pyspark启动时添加jar包

    pyspark --master local --jars /usr/local/spark/jars/mysql-connector-java-5.1.48.jar

    pyspark下执行  通过jdbc连接mysql

    jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark")
    option("dbtable","student").option("user","hadoop").option("password","Unsw2016").load()

    jdbcDF.show()

    通过从mysql中读取数据生成dataframe

    使用spark sql写入mysql

     Spark官方推荐使用Row对象来代替dict: from pyspark.sql import Row >>> User = Row('name', 'id', 'age')

    先生成表头(模式信心)structuredType提供一个列表对象

    没一个structField描述一个字段,Ture代表可以为空

    3步:得到表头,得到表中记录,表头和表中记录拼接形成打他frame,第四步是写入底部数据库

    表中记录是row对象封装的记录

     .map操作不会改变元素的个数

    p代表列表 并去掉尾部空格

     

    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark import SparkContext,SparkConf
    from pyspark.sql import SparkSession
    
    spark=SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    
    
    #model info table head, table record and combine them
    
    schema = StructType([StructField("id",IntegerType(),True),
            StructField("name",StringType(),True),
            StructField("gender",StringType(),True),
            StructField("age",IntegerType(),True)])
    
    #above is table head, below set two data,means two students
    studentRDD=spark.sparkContext
            .parallelize(["3 Rongcheng M 26","4 Guanhua M 27"])
            .map(lambda x:x.split(" "))
    
    #below create Row object, every Row object is a line in rowRDD
    rowRDD=studentRDD.map(lambda p:Row(int(p[0].strip()),p[1].strip(),p[2].strip(),int(p[3].strip())))
    
    #combine table head and table content
    studentDF=spark.createDataFrame(rowRDD,schema)
    
    #write into database
    prop={}
    prop["user"]="hadoop"
    prop["password"]="Unsw2016"
    prop["driver"]="com.mysql.jdbc.Driver"
    studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark","student","append",prop)

     写入成功

     下面复习所有spark知识点,python看完

  • 相关阅读:
    [Sql Server][原创]
    SQL Server T-SQL高级查询
    SQL 网文链接
    Epicor系统二次开发
    lambda表达式的变量作用域
    写一个正则表达式匹配手机号
    函数装饰器在类方法中的使用方法
    关于Django的session的使用 (装饰器版)
    Django ORM相关操作(2)
    Django ORM相关操作(1)
  • 原文地址:https://www.cnblogs.com/cschen588/p/11827898.html
Copyright © 2011-2022 走看看