zoukankan      html  css  js  c++  java
  • 数据读取保存(五)

    Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统:

    • 文件格式:Text 文件、Json 文件、csv 文件、Sequence 文件以及 Object 文件
    • 文件系统:本地文件系统、HDFS、Hbase 以及数据库

    1. 读写 text/hdfs 文件

    text/hdfs 类型的文件读都可以用 textFile(path),保存使用 saveAsTextFile(path)

    // 读取本地文件,必须保证每个节点都有该文件
    val rdd = sc.textFile("./xx.txt")
    
    // 保存到 hdfs
    rdd.saveAsTextFile(hdfs://hadoop1:9000/test/info.json)
    

    2. 读取 json 文件

    json 文件主要是需要解析其 json 格式,一般采用:SparkSQL,也可以使用 fastjson、scala.util.parsing.json.JSON

    scala> val rdd = sc.textFile("hdfs://hadoop1:9000/test/info.json")
    rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop1:9000/test/info.json MapPartitionsRDD[45] at textFile at <console>:24
    
    scala> rdd.collect()
    res39: Array[String] = Array({"age": 0, "name": "rose0"}, {"age": 1, "name": "rose1"}, {"age": 2, "name": "rose2"}, {"age": 3, "name": "rose3"}, {"age": 4, "name": "rose4"}, {"age": 5, "name": "rose5"}, {"age": 6, "name": "rose6"}, {"age": 7, "name": "rose7"}, {"age": 8, "name": "rose8"}, {"age": 9, "name": "rose9"}, {"age": 10, "name": "rose10"}, {"age": 11, "name": "rose11"}, {"age": 12, "name": "rose12"}, {"age": 13, "name": "rose13"}, {"age": 14, "name": "rose14"}, {"age": 15, "name": "rose15"}, {"age": 16, "name": "rose16"}, {"age": 17, "name": "rose17"}, {"age": 18, "name": "rose18"}, {"age": 19, "name": "rose19"}, {"age": 20, "name": "rose20"}, {"age": 21, "name": "rose21"}, {"age": 22, "name": "rose22"}, {"age": 23, "name": "rose23"}, {"age": 24, "name": "rose24"}, {"age": 25, "...
    scala> import scala.util.parsing.json.JSON
    import scala.util.parsing.json.JSON
    
    // 解析到的结果其实就是 Option 组成的数组, Option 存储的就是 Map 对象
    scala> val rdd2 = rdd.map(JSON.parseFull).collect()
    rdd2: Array[Option[Any]] = Array(Some(Map(age -> 0.0, name -> rose0)), Some(Map(age -> 1.0, name -> rose1)), Some(Map(age -> 2.0, name -> rose2)), Some(Map(age -> 3.0, name -> rose3)), Some(Map(age -> 4.0, name -> rose4)), Some(Map(age -> 5.0, name -> rose5)), Some(Map(age -> 6.0, name -> rose6)), Some(Map(age -> 7.0, name -> rose7)), Some(Map(age -> 8.0, name -> rose8)), Some(Map(age -> 9.0, name -> rose9)), Some(Map(age -> 10.0, name -> rose10)), Some(Map(age -> 11.0, name -> rose11)), Some(Map(age -> 12.0, name -> rose12)), Some(Map(age -> 13.0, name -> rose13)), Some(Map(age -> 14.0, name -> rose14)), Some(Map(age -> 15.0, name -> rose15)), Some(Map(age -> 16.0, name -> rose16)), Some(Map(age -> 17.0, name -> rose17)), Some(Map(age -> 18.0, name -> rose18)), Some(Map(age -> 19.0, na...
    

    3. 读取 SequenceFile 文件

    SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)

    val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
    
    //    // 保存 SequenceFile
    //    rdd1.saveAsSequenceFile("test_sequence")
    
    // 读取时需要指定读取数据的数据类型 [String, Int]
    val rdd2 = sc.sequenceFile[String, Int]("test_sequence")
    rdd2.collect().foreach(println)
    

    运行结果:

    (a,1)
    (b,2)
    (c,3)
    

    4. 读取 ObjectFile 文件

    对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制,可以通过 objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出

     // 保存
    val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
    rdd1.saveAsObjectFile("test_object_file")
    
    // 读取
    val rdd2 = sc.objectFile[(String, Int)]("test_object_file")
    
    rdd2.collect().foreach(println)
    

    5. 从 HDFS 读写文件

    Spark 的整个生态系统与 Hadoop 完全兼容的,所以对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样支持。

    Hadoop 有新旧两套 API 接口,为了能够兼容 Spark 也有两套,分别为:HadoopRDD 、newHadoopRDD,两个接口函数的参数分别为:

    • 输入格式(InputFormat):输入数据类型,新旧接口引用的版本分别为:org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
    • 键类型:[K, V] 中的 K
    • 值类型:[K, V] 中的 V
    • 分区值:指定由外部存储生成的 RDDpartition 数据的最小值,若没有指定,系统使用默认值 defaultMinSplits

    Hadoop 中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压

    如果用SparkHadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDDnewAPIHadoopRDD两个类就行了

    6. HBase 读写

    Spark 读取 HBase

    • 输入类型:org.apache.hadoop.hbase.mapreduce.TableInputFormat
    • 输出类型:结果为键值对,键的类型为 org.apache.hadoop.hbase.io.ImmutableBytesWritable,值的类型为 org.apache.hadoop.hbase.client.Result

    连接集群

    spark 应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper

    • 第一种是将 hbase-site.xml 文件加入 classpath
    • 第二种是在 HBaseConfiguration 实例中设置

    6.1 Maven 依赖

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.mortbay.jetty</groupId>
                <artifactId>servlet-api-2.5</artifactId>
            </exclusion>
            <exclusion>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
            </exclusion>
        </exclusions>
    
    </dependency>
    
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.mortbay.jetty</groupId>
                <artifactId>servlet-api-2.5</artifactId>
            </exclusion>
            <exclusion>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    

    6.2 读取 HBase 数据

    hbase 读取数据转化成 RDD

    package top.midworld.spark1016.hbase_access
    
    import java.util
    import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.SparkSession
    import org.json4s.jackson.Serialization
    
    import scala.collection.mutable
    
    object HbaseRead {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = session.sparkContext
    
        // 连接 HBase 的配置
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2")
    
        // 读取数据
        val rdd1 = sc.newAPIHadoopRDD(
          hbaseConf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result]
        )
    
        val resultRDD = rdd1.map {
          // it 封装了 rowkey、item 里面才是数据
          case (it, results) => {
            // 定义一个可变 map
            val resultMap = mutable.Map[String, Any]()
    
            // rowkey 添加到 resultMap 中
            resultMap += "rowKey" -> Bytes.toString(it.get())
    
            // 将其他数据添加到 resultMap 中
            val cells: util.List[Cell] = results.listCells()
    
            import scala.collection.JavaConversions._
            for (cell <- cells) {
              // 列名 - 列值
              val key = Bytes.toString(CellUtil.cloneQualifier(cell))
              val value = Bytes.toString(CellUtil.cloneValue(cell))
    
              resultMap += key -> value
            }
            // 把map转成json  json4s(json4scala)
            implicit val df = org.json4s.DefaultFormats
            Serialization.write(resultMap)
          }
        }
    
        resultRDD.collect().foreach(println)
        sc.stop()
      }
    }
    

    运行结果:

    {"alias2":"jun2","rowKey":"10004"}
    {"alias4":"jun4","rowKey":"10011"}
    {"alias5":"jun5","rowKey":"10016"}
    

    其他应用:

    val count = rdd1.count()
    println("rdd1 RDD Count:" + count)
    rdd1.cache()    // 缓存,避免 rdd 重新计算
    
    rdd1.foreach({
        case (_, results) => {
        val rowKey = Bytes.toString(results.getRow) 
        // info 为列族、alias2 为列名
        val name = Bytes.toString(results.getValue("info".getBytes, "alias2".getBytes))
    
        println(rowKey, name)
        }
    })
    

    6.2.1 sbt 打包编译 spark 程序

    上面使用 IDEAWindows 上测试,在 Linux 运行,需要将 spark 程序打包为 jar 包,常用的方法有:maven、sbt,这里采用 sbt

    1、环境准备

    • 开启 Hadoop、zookeeper、spark、hbase 集群
    • hbase/lib 中的一些 jar 包拷贝到 spark/jars/hbase
    // 在 spark 安装目录 jars 中新建 hbase/ 目录
    cd /home/hadoop/apps/spark-2.2.0/jars/
    mkdir hbase
    cd hbase
    
    // 拷贝以下 jar 包到 spark/jars/hbase 中
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/hbase*.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/guava-12.0.1.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/htrace-core-3.1.0-incubating.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/protobuf-java-2.5.0.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/metrics-core-2.2.0.jar .
    

    注意:缺少 metrics-core-2.2.0.jar 会报 Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge,可以参考:https://blog.csdn.net/u010842515/article/details/51451883

    2、编写 spark 程序:

    cd /home/hadoop/apps/spark-2.2.0/mycode/
    mkdir hbase
    cd hbase
    
    mkdir -p src/main/scala/
    vim SparkOperateHBase.scala
    

    spark 程序内容:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    import java.util
    import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.json4s.jackson.Serialization
    
    import scala.collection.mutable
    
    object SparkOperateHBase {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
    
    
        // 连接 HBase 的配置
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2")
    
        // 读取数据
        val rdd1 = sc.newAPIHadoopRDD(
          hbaseConf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result]
        )
    
        val resultRDD = rdd1.map {
          // it 封装了 rowkey、item 里面才是数据
          case (it, results) => {
            // 定义一个可变 map
            val resultMap = mutable.Map[String, Any]()
    
            // rowkey 添加到 resultMap 中
            resultMap += "rowKey" -> Bytes.toString(it.get())
    
            // 将其他数据添加到 resultMap 中
            val cells: util.List[Cell] = results.listCells()
    
            import scala.collection.JavaConversions._
            for (cell <- cells) {
              // 列名 - 列值
              val key = Bytes.toString(CellUtil.cloneQualifier(cell))
              val value = Bytes.toString(CellUtil.cloneValue(cell))
    
              resultMap += key -> value
            }
            // 把map转成json  json4s(json4scala)
            implicit val df = org.json4s.DefaultFormats
            Serialization.write(resultMap)
          }
        }
    
        resultRDD.collect().foreach(println)
        sc.stop()
      }
    }
    

    3、编写 sbt 程序:

    vim simple.sbt
    
    // libraryDependencies 为 spark 程序中用到的依赖包
    
    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
    libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
    

    4、编译打包:

    // jar 包位置 /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
    
    cd /home/hadoop/apps/spark-2.2.0/mycode/hbase
    
    // 编译打包
    /home/hadoop/apps/sbt/run.sh package
    

    5、提交 spark 任务:

    cd /home/hadoop/apps/spark-2.2.0/mycode/hbase
    
    /home/hadoop/apps/spark-2.2.0/bin/spark-submit --driver-class-path /home/hadoop/apps/spark-2.2.0/jars/hbase/*:/home/hadoop/apps/hbase-1.2.6/conf/ --class "SparkOperateHBase" /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
    

    6.3 往 Hbase 写入数据

    package top.midworld.spark1016.hbase_access
    
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableReduce
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.sql.SparkSession
    
    object HBaseWrite {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = session.sparkContext
    
        // 连接 HBase 的配置
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "stu")
    
        // 往 hbase 写入数据
        val job = Job.getInstance(hbaseConf)
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Put])
    
        val dataRDD = sc.parallelize(
          List(
            ("10017", "scala", "1"),
            ("10018", "spark", "2"),
            ("10019", "java", "3")
          )
        )
    
        //    将 rdd 封装为 TableReduce 格式
        val hbaseRDD = dataRDD.map {
          case (rowKey, name, age) =>
            // 设置 rowKey
            val rk = new ImmutableBytesWritable()
            rk.set(Bytes.toBytes(rowKey))
    
            // 添加数据
            val put = new Put(Bytes.toBytes(rowKey))
            // 分别为列族:info、列名:name、值:name/age
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
    
            // return 返回
            (rk, put)
        }
    
        // 写入
        hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
        sc.stop()
      }
    }
    
  • 相关阅读:
    开发脚本自动部署及监控
    内存进程与软件包安装
    Linux常用指令
    网络协议
    Python基础(二)
    python基础(一)
    shell编程
    正则与sed,grep,awk三剑客
    网络配置和元字符
    nginx
  • 原文地址:https://www.cnblogs.com/midworld/p/15647013.html
Copyright © 2011-2022 走看看