zoukankan      html  css  js  c++  java
  • 数据读取保存(五)

    Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统:

    • 文件格式:Text 文件、Json 文件、csv 文件、Sequence 文件以及 Object 文件
    • 文件系统:本地文件系统、HDFS、Hbase 以及数据库

    1. 读写 text/hdfs 文件

    text/hdfs 类型的文件读都可以用 textFile(path),保存使用 saveAsTextFile(path)

    // 读取本地文件,必须保证每个节点都有该文件
    val rdd = sc.textFile("./xx.txt")
    
    // 保存到 hdfs
    rdd.saveAsTextFile(hdfs://hadoop1:9000/test/info.json)
    

    2. 读取 json 文件

    json 文件主要是需要解析其 json 格式,一般采用:SparkSQL,也可以使用 fastjson、scala.util.parsing.json.JSON

    scala> val rdd = sc.textFile("hdfs://hadoop1:9000/test/info.json")
    rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop1:9000/test/info.json MapPartitionsRDD[45] at textFile at <console>:24
    
    scala> rdd.collect()
    res39: Array[String] = Array({"age": 0, "name": "rose0"}, {"age": 1, "name": "rose1"}, {"age": 2, "name": "rose2"}, {"age": 3, "name": "rose3"}, {"age": 4, "name": "rose4"}, {"age": 5, "name": "rose5"}, {"age": 6, "name": "rose6"}, {"age": 7, "name": "rose7"}, {"age": 8, "name": "rose8"}, {"age": 9, "name": "rose9"}, {"age": 10, "name": "rose10"}, {"age": 11, "name": "rose11"}, {"age": 12, "name": "rose12"}, {"age": 13, "name": "rose13"}, {"age": 14, "name": "rose14"}, {"age": 15, "name": "rose15"}, {"age": 16, "name": "rose16"}, {"age": 17, "name": "rose17"}, {"age": 18, "name": "rose18"}, {"age": 19, "name": "rose19"}, {"age": 20, "name": "rose20"}, {"age": 21, "name": "rose21"}, {"age": 22, "name": "rose22"}, {"age": 23, "name": "rose23"}, {"age": 24, "name": "rose24"}, {"age": 25, "...
    scala> import scala.util.parsing.json.JSON
    import scala.util.parsing.json.JSON
    
    // 解析到的结果其实就是 Option 组成的数组, Option 存储的就是 Map 对象
    scala> val rdd2 = rdd.map(JSON.parseFull).collect()
    rdd2: Array[Option[Any]] = Array(Some(Map(age -> 0.0, name -> rose0)), Some(Map(age -> 1.0, name -> rose1)), Some(Map(age -> 2.0, name -> rose2)), Some(Map(age -> 3.0, name -> rose3)), Some(Map(age -> 4.0, name -> rose4)), Some(Map(age -> 5.0, name -> rose5)), Some(Map(age -> 6.0, name -> rose6)), Some(Map(age -> 7.0, name -> rose7)), Some(Map(age -> 8.0, name -> rose8)), Some(Map(age -> 9.0, name -> rose9)), Some(Map(age -> 10.0, name -> rose10)), Some(Map(age -> 11.0, name -> rose11)), Some(Map(age -> 12.0, name -> rose12)), Some(Map(age -> 13.0, name -> rose13)), Some(Map(age -> 14.0, name -> rose14)), Some(Map(age -> 15.0, name -> rose15)), Some(Map(age -> 16.0, name -> rose16)), Some(Map(age -> 17.0, name -> rose17)), Some(Map(age -> 18.0, name -> rose18)), Some(Map(age -> 19.0, na...
    

    3. 读取 SequenceFile 文件

    SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)

    val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
    
    //    // 保存 SequenceFile
    //    rdd1.saveAsSequenceFile("test_sequence")
    
    // 读取时需要指定读取数据的数据类型 [String, Int]
    val rdd2 = sc.sequenceFile[String, Int]("test_sequence")
    rdd2.collect().foreach(println)
    

    运行结果:

    (a,1)
    (b,2)
    (c,3)
    

    4. 读取 ObjectFile 文件

    对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制,可以通过 objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出

     // 保存
    val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
    rdd1.saveAsObjectFile("test_object_file")
    
    // 读取
    val rdd2 = sc.objectFile[(String, Int)]("test_object_file")
    
    rdd2.collect().foreach(println)
    

    5. 从 HDFS 读写文件

    Spark 的整个生态系统与 Hadoop 完全兼容的,所以对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样支持。

    Hadoop 有新旧两套 API 接口,为了能够兼容 Spark 也有两套,分别为:HadoopRDD 、newHadoopRDD,两个接口函数的参数分别为:

    • 输入格式(InputFormat):输入数据类型,新旧接口引用的版本分别为:org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
    • 键类型:[K, V] 中的 K
    • 值类型:[K, V] 中的 V
    • 分区值:指定由外部存储生成的 RDDpartition 数据的最小值,若没有指定,系统使用默认值 defaultMinSplits

    Hadoop 中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压

    如果用SparkHadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDDnewAPIHadoopRDD两个类就行了

    6. HBase 读写

    Spark 读取 HBase

    • 输入类型:org.apache.hadoop.hbase.mapreduce.TableInputFormat
    • 输出类型:结果为键值对,键的类型为 org.apache.hadoop.hbase.io.ImmutableBytesWritable,值的类型为 org.apache.hadoop.hbase.client.Result

    连接集群

    spark 应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper

    • 第一种是将 hbase-site.xml 文件加入 classpath
    • 第二种是在 HBaseConfiguration 实例中设置

    6.1 Maven 依赖

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.mortbay.jetty</groupId>
                <artifactId>servlet-api-2.5</artifactId>
            </exclusion>
            <exclusion>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
            </exclusion>
        </exclusions>
    
    </dependency>
    
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.mortbay.jetty</groupId>
                <artifactId>servlet-api-2.5</artifactId>
            </exclusion>
            <exclusion>
                <groupId>javax.servlet</groupId>
                <artifactId>servlet-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    

    6.2 读取 HBase 数据

    hbase 读取数据转化成 RDD

    package top.midworld.spark1016.hbase_access
    
    import java.util
    import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.SparkSession
    import org.json4s.jackson.Serialization
    
    import scala.collection.mutable
    
    object HbaseRead {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = session.sparkContext
    
        // 连接 HBase 的配置
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2")
    
        // 读取数据
        val rdd1 = sc.newAPIHadoopRDD(
          hbaseConf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result]
        )
    
        val resultRDD = rdd1.map {
          // it 封装了 rowkey、item 里面才是数据
          case (it, results) => {
            // 定义一个可变 map
            val resultMap = mutable.Map[String, Any]()
    
            // rowkey 添加到 resultMap 中
            resultMap += "rowKey" -> Bytes.toString(it.get())
    
            // 将其他数据添加到 resultMap 中
            val cells: util.List[Cell] = results.listCells()
    
            import scala.collection.JavaConversions._
            for (cell <- cells) {
              // 列名 - 列值
              val key = Bytes.toString(CellUtil.cloneQualifier(cell))
              val value = Bytes.toString(CellUtil.cloneValue(cell))
    
              resultMap += key -> value
            }
            // 把map转成json  json4s(json4scala)
            implicit val df = org.json4s.DefaultFormats
            Serialization.write(resultMap)
          }
        }
    
        resultRDD.collect().foreach(println)
        sc.stop()
      }
    }
    

    运行结果:

    {"alias2":"jun2","rowKey":"10004"}
    {"alias4":"jun4","rowKey":"10011"}
    {"alias5":"jun5","rowKey":"10016"}
    

    其他应用:

    val count = rdd1.count()
    println("rdd1 RDD Count:" + count)
    rdd1.cache()    // 缓存,避免 rdd 重新计算
    
    rdd1.foreach({
        case (_, results) => {
        val rowKey = Bytes.toString(results.getRow) 
        // info 为列族、alias2 为列名
        val name = Bytes.toString(results.getValue("info".getBytes, "alias2".getBytes))
    
        println(rowKey, name)
        }
    })
    

    6.2.1 sbt 打包编译 spark 程序

    上面使用 IDEAWindows 上测试,在 Linux 运行,需要将 spark 程序打包为 jar 包,常用的方法有:maven、sbt,这里采用 sbt

    1、环境准备

    • 开启 Hadoop、zookeeper、spark、hbase 集群
    • hbase/lib 中的一些 jar 包拷贝到 spark/jars/hbase
    // 在 spark 安装目录 jars 中新建 hbase/ 目录
    cd /home/hadoop/apps/spark-2.2.0/jars/
    mkdir hbase
    cd hbase
    
    // 拷贝以下 jar 包到 spark/jars/hbase 中
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/hbase*.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/guava-12.0.1.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/htrace-core-3.1.0-incubating.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/protobuf-java-2.5.0.jar .
    [hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/metrics-core-2.2.0.jar .
    

    注意:缺少 metrics-core-2.2.0.jar 会报 Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge,可以参考:https://blog.csdn.net/u010842515/article/details/51451883

    2、编写 spark 程序:

    cd /home/hadoop/apps/spark-2.2.0/mycode/
    mkdir hbase
    cd hbase
    
    mkdir -p src/main/scala/
    vim SparkOperateHBase.scala
    

    spark 程序内容:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    import java.util
    import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.json4s.jackson.Serialization
    
    import scala.collection.mutable
    
    object SparkOperateHBase {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
    
    
        // 连接 HBase 的配置
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2")
    
        // 读取数据
        val rdd1 = sc.newAPIHadoopRDD(
          hbaseConf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result]
        )
    
        val resultRDD = rdd1.map {
          // it 封装了 rowkey、item 里面才是数据
          case (it, results) => {
            // 定义一个可变 map
            val resultMap = mutable.Map[String, Any]()
    
            // rowkey 添加到 resultMap 中
            resultMap += "rowKey" -> Bytes.toString(it.get())
    
            // 将其他数据添加到 resultMap 中
            val cells: util.List[Cell] = results.listCells()
    
            import scala.collection.JavaConversions._
            for (cell <- cells) {
              // 列名 - 列值
              val key = Bytes.toString(CellUtil.cloneQualifier(cell))
              val value = Bytes.toString(CellUtil.cloneValue(cell))
    
              resultMap += key -> value
            }
            // 把map转成json  json4s(json4scala)
            implicit val df = org.json4s.DefaultFormats
            Serialization.write(resultMap)
          }
        }
    
        resultRDD.collect().foreach(println)
        sc.stop()
      }
    }
    

    3、编写 sbt 程序:

    vim simple.sbt
    
    // libraryDependencies 为 spark 程序中用到的依赖包
    
    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
    libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
    libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
    

    4、编译打包:

    // jar 包位置 /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
    
    cd /home/hadoop/apps/spark-2.2.0/mycode/hbase
    
    // 编译打包
    /home/hadoop/apps/sbt/run.sh package
    

    5、提交 spark 任务:

    cd /home/hadoop/apps/spark-2.2.0/mycode/hbase
    
    /home/hadoop/apps/spark-2.2.0/bin/spark-submit --driver-class-path /home/hadoop/apps/spark-2.2.0/jars/hbase/*:/home/hadoop/apps/hbase-1.2.6/conf/ --class "SparkOperateHBase" /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
    

    6.3 往 Hbase 写入数据

    package top.midworld.spark1016.hbase_access
    
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableReduce
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.sql.SparkSession
    
    object HBaseWrite {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
        val sc = session.sparkContext
    
        // 连接 HBase 的配置
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3")
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "stu")
    
        // 往 hbase 写入数据
        val job = Job.getInstance(hbaseConf)
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Put])
    
        val dataRDD = sc.parallelize(
          List(
            ("10017", "scala", "1"),
            ("10018", "spark", "2"),
            ("10019", "java", "3")
          )
        )
    
        //    将 rdd 封装为 TableReduce 格式
        val hbaseRDD = dataRDD.map {
          case (rowKey, name, age) =>
            // 设置 rowKey
            val rk = new ImmutableBytesWritable()
            rk.set(Bytes.toBytes(rowKey))
    
            // 添加数据
            val put = new Put(Bytes.toBytes(rowKey))
            // 分别为列族:info、列名:name、值:name/age
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
    
            // return 返回
            (rk, put)
        }
    
        // 写入
        hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
        sc.stop()
      }
    }
    
  • 相关阅读:
    net core 使用 rabbitmq
    asp.net core WebApi 返回 HttpResponseMessage
    asp.net core 2.1 WebApi 快速入门
    JQuery EasyUI combobox动态添加option
    php截取字符去掉最后一个字符
    JQuery EasyUI Combobox的onChange事件
    对于不返回任何键列信息的 selectcommand 不支持 updatecommand 的动态 sql 生成
    Access2007 操作或事件已被禁用模式阻止解决办法
    Easyui 中 Tabsr的常用方法
    Win 7 IE11不能下载文件,右键另存为也不行
  • 原文地址:https://www.cnblogs.com/midworld/p/15647013.html
Copyright © 2011-2022 走看看