zoukankan      html  css  js  c++  java
  • Spark数据存储和分区操作

    Spark数据读取

    • 对于存储在本地文件系统或分布式文件系统(HDFS、Amazon S3)中的数据,Spark可以访问很多种不同的文件格式,比如文本文件、JSON、SequenceFile
    • Spark SQL中的结构化数据源,包括JSON和Hive的结构化数据源
    • 数据库和键值存储,自带的库,联结HBase或其他JDBC源
    格式名称 结构化 备注
    文本文件 普通的文本文件,每行一条记录
    JSON 半结构化 每行一条记录
    CSV 非常常见的基于文本的格式
    SequenceFiles 用于键值对的常见Hadoop文件格式

    textFile()和saveAsTextFile(),读取文本文件和保存为文本文件。

    读取JSON数据的方式是将数据作为文本文件读取,然后使用JSON解析器对RDD中的值进行映射操作。

    import json
    data = input.map(lambda x: json.loads(x))
    
    //保存JSON
    (data.filter(lambda x: x["lovesPands"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
    

    Spark有专门用来读取SequenceFile的接口,可以调用sequenceFile(path,keyClass,valueClass,minparttions)来读取。

    val data = sc.sequenceFile(inFile,"org.apache.hadoop.io.Text","org.apache.hadoop.io.IntWritable")
    

    文件压缩

    对数据进行压缩以节省存储空间和网络传输开销。Spark原生的输入方式(texeFile和sequenceFile)可以自动处理一类型的压缩。

    文件系统

    • 本地文件,file:///home/path
    • Amazon S3,s3n://bucket/path
    • HDFS,hdfs://master/path

    数据库

    Java数据库连接,需要构建一个org.apache.spark.rdd.jdbcRDD,将SparkContext和其他参数一起传给它

    //Scala
    def createConnect() = {
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
    }
    def extractValues(r: ResultSet) = {
        (r.getInt(1),r.getString(2))
    }
    
    val data = new JdbcRDD(sc,createConnection,"SELECT * FROM panda WHERE ",lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
    println(data.collect().toList)
    
    • 提供一个用于数据库连接的函数
    • 提供一个可以读取一定范围内数据的查询,以及查询参数中lowerBound和upperBound的值。
    • 最后一个参数可以将输出结果从java.sql.ResultSet转为对操作数据有用的格式的函数。

    Spark可以用org.apache.hadoop.hbase.mapreduce.TableInputFormat类通过Hadoop输入格式访问HBase。键的类型为org.apache.hadoop.hbase.io.ImmutableBytesWritable,值的类型为org.apache.hadoop.hbase.client.Result。

    //Scala
    import org.apache.hadoop.hbase.HBaseConfigration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE,"tablename")
    
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
    

    分区操作

    分区的作用:

    • 可以增加并行度,在多个节点上同时计算。
    • 减少通信开销。join()时,减少shuffle。

    分区原则:分区个数等于本地机器CPU数目

    rdd.sc.parallelize(list,2)//设置两个分区
    rdd.repartition(1)//重新分区,分1个
    

    分区方法有三种:

    1. 哈希分区,HashPartitioner
    2. 区域分区,RangePartitioner
    3. 自定义分区
    def Mypartition(key):
        return key % 10;
    

    可以在每个分区共享一个数据库连接池,避免建立太多连接

    def processCallsigns(signs):
        http = urllib3.PoolManager()//建立连接池
        urls = map()//操作
        ···
    
  • 相关阅读:
    【来自知乎】AR技术可以通过H5实现吗?不通过APP
    太虚AR
    【ArUco】- Augmented reality library based on OpenCV
    unity MVC简易框架! MVC in Code Control
    游戏服务器框架与互联网产品的认识
    关于 boost::asio::io_service::run() 出现【句柄无效】的问题
    编译luabind-0.9.1 出现 error C2665: 'boost::operator ==' : none of the 4 overloads could convert all the argument types 的解决方案
    javascript 控制 table tr display block 显示模式时,只对第一个单元格有效
    Ogre::UINT 与 其他库的 类型冲突问题
    排序仿函数 出现 std::priority_queue::push() 的 invalid operator < 异常
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12425488.html
Copyright © 2011-2022 走看看