zoukankan      html  css  js  c++  java
  • Spark学习笔记——读写HDFS

    使用Spark读写HDFS中的parquet文件

    文件夹中的parquet文件

    build.sbt文件

    name := "spark-hbase"
    
    version := "1.0"
    
    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "2.1.0",
      "mysql" % "mysql-connector-java" % "5.1.31",
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.hbase" % "hbase-common" % "1.3.0",
      "org.apache.hbase" % "hbase-client" % "1.3.0",
      "org.apache.hbase" % "hbase-server" % "1.3.0",
      "org.apache.hbase" % "hbase" % "1.2.1"
    )
    

    Scala实现方法

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._
    import java.util.Properties
    
    import com.google.common.collect.Lists
    import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    
    
    /**
      * Created by mi on 17-4-11.
      */
    
    case class resultset(name: String,
                         info: String,
                         summary: String)
    
    case class IntroItem(name: String, value: String)
    
    
    case class BaikeLocation(name: String,
                             url: String = "",
                             info: Seq[IntroItem] = Seq(),
                             summary: Option[String] = None)
    
    case class MewBaikeLocation(name: String,
                             url: String = "",
                             info: Option[String] = None,
                             summary: Option[String] = None)
    
    
    object MysqlOpt {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        //定义数据库和表信息
        val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
        val table = "baike_pages"
    
        //读取parquetFile,并写入Mysql
        val sparkSession = SparkSession.builder()
          .master("local")
          .appName("spark session example")
          .getOrCreate()
        val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")
    //    parquetDF.collect().take(20).foreach(println)
        //parquetDF.show()
    
        //BaikeLocation是读取的parquet文件中的case class
        val ds = parquetDF.as[BaikeLocation].map { line =>
          //把info转换为新的case class中的类型String
          val info = line.info.map(item => item.name + ":" + item.value).mkString(",")
          //注意需要把字段放在一个case class中,不然会丢失列信息
          MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary)
        }.cache()
    
        ds.show()
    //    ds.take(2).foreach(println)
    
        //写入Mysql
        //    val prop = new Properties()
        //    prop.setProperty("user", "root")
        //    prop.setProperty("password", "123456")
        //    ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop)
    
        //写入parquetFile
        ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1")
    
      }
    
    }
    

    df.show打印出来的信息,如果没放在一个case class中的话,name,url,info,summary这列信息会变成1,2,3,4

    使用spark-shell查看写回去的parquet文件的信息

    #进入spark-shell
    import org.apache.spark.sql.SQLContext
    val sqlContext = new SQLContext(sc)
    val path = "file:///home/mi/coding/coding/baikeshow_data/baikeshow1"
    val df = sqlContext.parquetFile(path)
    df.show
    df.count
    

     

    如果只想显示某一列的话,可以这么做

    df.select("title").take(100).foreach(println)  //只显示title这一列的信息
    
  • 相关阅读:
    (转)如何在一台电脑上开启多个tomcat 和配置让系统识别哪个具体的tomcat
    Moccakids-Tangram Puzzle 限免啦!
    iOS:OC Lib:MagicalRecord
    iOS Vuforia:TextReco 增加自己的单词库
    iOS:Tools:快速注释Doxygen
    聊聊分布式事务,再说说解决方案
    .NET Core 事件总线,分布式事务解决方案:CAP
    Glob 模式
    基于 Kong 和 Kubernetes 的 WebApi 多版本解决方案
    ASP.NET Core 身份验证(一)
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/6727808.html
Copyright © 2011-2022 走看看