zoukankan      html  css  js  c++  java
  • spark读取文本数据测试

    已知文本有三列,整理数据,并导入mysql


    scala> import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.SQLContext

    scala> val gitrdd=sc.textFile("/tmp/git.txt")
    gitrdd: org.apache.spark.rdd.RDD[String] = /tmp/git.txt MapPartitionsRDD[1] at textFile at <console>:25


    scala> gitrdd.count
    res2: Long = 548

    分割符空格不定

    scala> gitrdd.map(_.split("   |  ")).filter(_.length<3).count
    res3: Long = 8

    scala> gitrdd.map(_.split("   |  ")).filter(_.length<3).collect
    res1: Array[Array[String]] = Array(Array(""), Array(""), Array(" "), Array(""))

    scala> val gitDF=gitrdd.map(_.split("   |  ")).filter(_.length==3).map(x=>(x(0),x(1),x(2))).toDF
    gitDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]


    scala> gitDF.registerTempTable("tb_git")
    warning: there was one deprecation warning; re-run with -deprecation for details

    scala> sqlcon.sql("select * from tb_git").show
    +--------------------+--------------------+----------------+
    |                  _1|                  _2|              _3|
    +--------------------+--------------------+----------------+
    ...................................................

    .................................................
    +--------------------+--------------------+----------------+
    only showing top 20 rows


    scala> val gitDF=gitrdd.map(_.split("   |  ")).filter(_.length==3).map(x=>(x(0),x(1),x(2))).toDF("name","email","else")
    gitDF: org.apache.spark.sql.DataFrame = [name: string, email: string ... 1 more field]

    scala> gitDF.registerTempTable("tb_git")
    warning: there was one deprecation warning; re-run with -deprecation for details


    scala> sqlcon.sql("select * from tb_git").show
    +--------------------+--------------------+----------------+
    |                name|               email|            else|
    +--------------------+--------------------+----------------+
    .........................
    +--------------------+--------------------+----------------+
    only showing top 20 rows

    mysql> create table tb_git(name varchar(50),email varchar(80),else1 varchar(50));  
    Query OK, 0 rows affected (0.04 sec)

    scala>  import java.sql.{Connection,DriverManager, PreparedStatement,Date}
    import java.sql.{Connection, DriverManager, PreparedStatement, Date}

    scala>  def rddtodb(iter:Iterator[(String,String,String)]){var con:Connection=null;var ps:PreparedStatement=null;val sql="insert into tb_git(name,email,else1)values(?,?,?)" ;try{con=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root");iter.foreach(line=>{ps=con.prepareStatement(sql);ps.setString(1,line._1.toString);ps.setString(2,line._2.toString);ps.setString(3,line._3.toString);ps.executeUpdate()}) }catch{case e:Exception=>println(e.toString)} finally{if(con!=null)con.close;if(ps!=null)ps.close}}
    rddtodb: (iter: Iterator[(String, String, String)])Unit

    scala> gitrdd.map(_.split("   |  ")).filter(_.length==3).map(x=>(x(0).trim,x(1).trim,x(2).trim)).foreachPartition(rddtodb)

    mysql> select count(1) from tb_git;
    +----------+
    | count(1) |
    +----------+
    |      534 |
    +----------+
    1 row in set (0.03 sec)

  • 相关阅读:
    cs ip 通过jmp转移命令间接赋值。无法直接对其赋值。
    8086 cpu为什么要把段地址*16+偏移量形成物理地址呢?
    保护模式和实模式的区别
    计算机的内存是以字节为单位的, 这个认知很重要。
    计算机的内存是以字节为单位的。
    一个字 word 是16位, 一个字由两个字节组成 , 字节=byte ,一个字节8位, 位=bit 如果没有特殊说明kb 就是指 k*bit
    物理地址为20位 如10000H 用段地址*16+偏移地址表示
    深入学习Java线程池
    在线考试系统镜像构建、推送、部署
    容器 变成镜像提交到阿里云镜像仓库
  • 原文地址:https://www.cnblogs.com/playforever/p/9661245.html
Copyright © 2011-2022 走看看