zoukankan      html  css  js  c++  java
  • 2020寒假学习记录(15)——编程实现利用 DataFrame 读写 MySQL 的数据

    (1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的 两行数据。

    表 6-2 employee 表原有数据

    id name gender Age

    1 Alice F 22

    2 John M 25

    hadoop@hadoop-virtual-machine:~$ mysql -u root -p
    Enter password: 
    Welcome to the MySQL monitor.  Commands end with ; or g.
    Y
    
    mysql> create database sparktest;
    Query OK, 1 row affected (0.00 sec)
    
    mysql> show databases;
    +--------------------+
    | Database           |
    +--------------------+
    | information_schema |
    | hive               |
    | mysql              |
    | performance_schema |
    | sparktest          |
    | sys                |
    +--------------------+
    6 rows in set (0.10 sec)
    
    
    mysql> use sparktest;
    Database changed
    
    mysql> create table employee(id int(10), name varchar(20), gender varchar(2),age int(4));
    Query OK, 0 rows affected (0.07 sec)
    
    mysql> describe employee;
    +--------+-------------+------+-----+---------+-------+
    | Field  | Type        | Null | Key | Default | Extra |
    +--------+-------------+------+-----+---------+-------+
    | id     | int(10)     | YES  |     | NULL    |       |
    | name   | varchar(20) | YES  |     | NULL    |       |
    | gender | varchar(2)  | YES  |     | NULL    |       |
    | age    | int(4)      | YES  |     | NULL    |       |
    +--------+-------------+------+-----+---------+-------+
    4 rows in set (0.12 sec)
    
    mysql> insert into employee values ("1","Alice","F","22")
        -> ;
    Query OK, 1 row affected (0.06 sec)
    
    mysql> insert into employee values ("2","John","M","25");
    Query OK, 1 row affected (0.01 sec)


    (2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

    表 6-3 employee 表新增数据

    id name gender age

    3 Mary F 26

    4 Tom M 23

    
    


    scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable", "employee").option("user", "root").option("password", "7").load()
    jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

    
    

    scala> jdbcDF.show()
    +---+-----+------+---+
    | id| name|gender|age|
    +---+-----+------+---+
    | 1|Alice| F| 22|
    | 2| John| M| 25|
    +---+-----+------+---+





    import java.util.Properties import org.apache.spark.sql.types._ import org.apache.spark.sql.Row
    object TestMySQL { def main(args: Array[String]) { val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" ")) val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true))) val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt)) val employeeDF = spark.createDataFrame(rowRDD, schema) val prop = new Properties() prop.put("user", "root") prop.put("password", "hadoop") prop.put("driver","com.mysql.jdbc.Driver") employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", sparktest.employee", prop) val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").optio n("dbtable","employee").option("user","root").option("password", "hadoop").load() jdbcDF.agg("age" -> "max", "age" -> "sum") } }
  • 相关阅读:
    vue2.0阻止事件冒泡
    IconFont 图标制作和使用
    Gulp入门教程
    伪类实现特殊图形,一个span加三角形
    Vue渲染列表,在更新data属性后,列表未更新问题
    理解Array.prototype.slice.call(arguments)
    ;(function(){ //代码})(); 自执行函数开头为什么要加;或者!
    Hexo 搭建博客 本地运行 常见报错及解决办法
    说说JSON和JSONP,也许你会豁然开朗
    数组去重的常用方法
  • 原文地址:https://www.cnblogs.com/Qi77/p/12324158.html
Copyright © 2011-2022 走看看