zoukankan      html  css  js  c++  java
  • Spark(十)【RDD的读取和保存】

    一.文件类型

    1.Text文件

    读写

    读取
    scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
    hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
    保存
    scala> hdfsFile.saveAsTextFile("/fruitOut")
    

    2.Json文件

    使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

    (1)导入解析json所需的包
    scala> import scala.util.parsing.json.JSON
    (2)上传json文件到HDFS
    [atguigu@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
    (3)读取文件
    scala> val json = sc.textFile("/people.json")
    json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
    (4)解析json数据
    scala> val result  = json.map(JSON.parseFull)
    result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
    

    3.对象文件

    对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFilek,v 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型

    读写

    (1)创建一个RDD
    scala> val rdd = sc.parallelize(Array(1,2,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
    (2)将RDD保存为Object文件
    scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
    (3)查看该文件
    [hadoop@hadoop102 objectFile]$ pwd
    /opt/module/spark/objectFile
    
    [hadoop@hadoop102 objectFile]$ ll
    总用量 8
    -rw-r--r-- 1 atguigu atguigu 142 10月  9 10:37 part-00000
    -rw-r--r-- 1 atguigu atguigu 142 10月  9 10:37 part-00001
    -rw-r--r-- 1 atguigu atguigu   0 10月  9 10:37 _SUCCESS
    
    [hadoop@hadoop102 objectFile]$ cat part-00000 
    SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l
    (4)读取Object文件
    scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
    objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
    (5)打印读取后的Sequence文件
    scala> objFile.collect
    res19: Array[Int] = Array(1, 2, 3, 4)
    

    4.Sequence文件

    很少用了。。

    注意:SequenceFile文件只针对PairRDD
    (1)创建一个RDD
    scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
    (2)将RDD保存为Sequence文件
    scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
    (3)查看该文件
    [hadoop@hadoop102 seqFile]$ pwd
    /opt/module/spark/seqFile
    
    [hadoop@hadoop102 seqFile]$ ll
    总用量 8
    -rw-r--r-- 1 atguigu atguigu 108 10月  9 10:29 part-00000
    -rw-r--r-- 1 atguigu atguigu 124 10月  9 10:29 part-00001
    -rw-r--r-- 1 atguigu atguigu   0 10月  9 10:29 _SUCCESS
    [hadoop@hadoop102 seqFile]$ cat part-00000
    SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
    (4)读取Sequence文件
    scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
    seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
    (5)打印读取后的Sequence文件
    scala> seq.collect
    res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
    

    二.文件系统

    1. MySQL

    依赖

      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.27</version>
      </dependency>
    

    读取

    import java.sql.DriverManager
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description: 从Mysql读取数据
     * @author: HaoWu
     * @create: 2020年08月05日
     */
    object MySqlReadWriteTest {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
        val sc: SparkContext = new SparkContext(sparkConf)
        val rdd: JdbcRDD[(Int, String)] = new JdbcRDD(
          sc,
          () => {
            Class.forName("com.mysql.jdbc.Driver").newInstance()
            DriverManager.getConnection("jdbc:mysql://hadoop102:3306/azkaban", "root", "root")
          },
          "select * from project_files where project_id >= ? and project_id <= ?;",
          1,
          4,
          1,
          //返回值是个数组,已经将JDBC返回的结果处理过。
          r => (r.getInt(1), r.getString(2)))
        println(rdd.count())
        rdd.foreach(println(_))
        sc.stop()
      }
    }
    

    保存

    import java.sql.{Connection, DriverManager, PreparedStatement}
    import org.apache.spark.rdd.{JdbcRDD, RDD}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description: 向Mysql中插入数据
     * @author: HaoWu
     * @create: 2020年08月05日
     */
    object MySqlReadWriteTest {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
        val sc: SparkContext = new SparkContext(sparkConf)
        val list = List((1, 9), (1, 10))
        val rdd: RDD[(Int, Int)] = sc.makeRDD(list)
        //使用foreachPartition效率更高,批量,不用频繁创建mysql连接
        rdd.foreachPartition(iter => {
          // 创建Connection
          val con: Connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/azkaban", "root", "root")
          //准备sql
          val sql="insert into project_files(project_id,version) values(?,?)"
    
          //PreapredStatement
          val ps: PreparedStatement = con.prepareStatement(sql)
          //将批量数据依次插入
          iter.foreach{
            case(project_id,version) => {
              //插入int类型
              ps.setInt(1,project_id)
              ps.setInt(2,version)
              //执行sql
              ps.executeUpdate()
            }
          }
          ps.close()
          con.close()
        })
        sc.stop()
      }
    }
    

    2. Hbase

    依赖

       <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-server</artifactId>
           <version>2.0.0</version>
       </dependency>
    
       <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-client</artifactId>
           <version>2.0.0</version>
       </dependency>
    
       <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-mapreduce</artifactId>
           <version>2.0.0</version>
       </dependency>
    

    将hbase的配置文件hbase-site.xml,放到resource目录,保留连接zookeeper

    	<property>
    		<name>hbase.zookeeper.quorum</name>
    		<value>hadoop102,hadoop103,hadoop104</value>
    	</property>
    

    读取

    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.rdd.NewHadoopRDD
    import org.apache.spark.{SparkConf, SparkContext}
    /**
     * @description: Hbase的读取
     * @author: HaoWu
     * @create: 2020年08月05日
     */
    object HbaseReadWriterTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        // 创建连接   默认读取hadoop中的配置文件,和hbase中的配置文件  默认使用的还是TextInputFormat
        val conf: Configuration = HBaseConfiguration.create()
        // 设置当前要读取哪个表
        conf.set(TableInputFormat.INPUT_TABLE, "bigdata:user")
        //核心创建RDD
        val rdd = new NewHadoopRDD[ImmutableBytesWritable, Result](sc,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result],
          conf)
        rdd.foreach {
          case (rowKey, result) => {
            // CellUtil  : 取出Cell某个属性   Bytes: 将Java中的数据类型 和byte[]互转
            // 获取一条数据的所有cell
            val cells: Array[Cell] = result.rawCells()
            for (cell <- cells) {
              println(Bytes.toString(CellUtil.cloneRow(cell)) + " " +
                Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " " +
                Bytes.toString(CellUtil.cloneValue(cell)))
            }
          }
        }
      }
    }
    

    写入

    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.rdd. RDD
    import org.apache.spark.{SparkConf, SparkContext}
    /**
     * @description: Hbase的保存
     * @author: HaoWu
     * @create: 2020年08月05日
     */
    object HbaseReadWriterTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        // 创建连接   默认读取hadoop中的配置文件,和hbase中的配置文件  默认使用的还是TextInputFormat
        val conf: Configuration = HBaseConfiguration.create()
        // 设置当前要写出到哪个表
        conf.set(TableOutputFormat.OUTPUT_TABLE, "bigdata:user")
    
        //在Conf中设置各种参数
        val job: Job = Job.getInstance(conf)
    
        //设置输出格式
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        // 设置输出的key,value的类型
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Put])
    
        // 用list封装数据(rowkey,(列簇,列,值))
        val list = List(("1005", ("info2", "age", "20")), ("1005",( "info2", "name", "marry")), ("1006", ("info2", "age", "21")))
    
        val rdd: RDD[(String, (String, String, String))] = sc.makeRDD(list, 2)
    
        // 使用spark将数据封装为输出的key-value类型
        val rdd2: RDD[(ImmutableBytesWritable, Put)] = rdd.map {
          case (rowkey, (cf, cq, v)) => {
            //封装rowkey
            val key = new ImmutableBytesWritable()
            key.set(Bytes.toBytes(rowkey))
            //封装put
            val value = new Put(Bytes.toBytes(rowkey))
            value.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(v))
            (key, value)
          }
        }
    
        //之前设置的配置传入
        rdd2.saveAsNewAPIHadoopDataset(job.getConfiguration)
      }
    }
    

    踩的坑

    在跑读取hbase数据的时候发现程序报错:

    原因:pom的hbase依赖包必须放置spark-core包后面,不然就报这个错误。

    java.lang.ExceptionInInitializerError
    	at org.apache.spark.SparkContext.withScope(SparkContext.scala:751)
    	at org.apache.spark.SparkContext.textFile(SparkContext.scala:882)
    	at com.spark.rdd.RDDTest.testMap(RDDTest.scala:62)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0
    	at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
    	at com.fasterxml.jackson.module.scala.JacksonModule.setupModule$(JacksonModule.scala:46)
    	at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17)
    	at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
    	at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
    	at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
    	... 27 more
    
  • 相关阅读:
    ROW_NUMBER() OVER (PARTITION BY yy ORDER BY zz) in Linq
    Oracle-sql分页方法
    Lambda
    ISNULL做简单的显示字段逻辑
    Select2使用方法汇总
    mysqldump 定时备份数据(全量)
    ubuntu16.10下安装erlang和RabbitMQ
    XShell连接本地Ubuntu虚拟机
    Haroopad 安装到 Mac OSX
    Swagger 生成API文档
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13444602.html
Copyright © 2011-2022 走看看