zoukankan      html  css  js  c++  java
  • spark处理mongodb数据

     

    mongodb是一种文档型数据库,作为一个适用于敏捷开发的数据库,mongodb的数据模式可以随着应用程序的发展而灵活地更新。但是mongodb适合一次查询的需求,对于统计、分析(尤其是在需要跨表、跨库的情况下)并不是太方便,我们可以用spark来处理mongodb数据。架构图如下:

    我使用的spark版本是spark-1.6.2,mongodb版本是3.2。我主要接触了以下两种连接器:

    1、mongodb官方连接器

    github地址:https://github.com/mongodb/mongo-spark

    mongodb官方文档:https://docs.mongodb.com/spark-connector/

    api文档(java版):https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.0.0

    加载mongodb数据的方式如下:

    [python] view plain copy
     
    1. from pyspark import SparkConf, SparkContext  
    2. from pyspark.sql import SQLContext  
    3. from pyspark.sql.types import *  
    4. sc = SparkContext()  
    5. ctx = SQLContext(sc)  
    6. test_collection = ctx.read.format("com.mongodb.spark.sql").options(uri="mongodb://192.168.0.1:27017", database="test_db", collection="test_collection").load()  
    这样就把指定的mongodb实例中的test_db库中的test_collection表加载到了spark中。可以使用以下代码查看表的格式,查看第一行数据:
    [python] view plain copy
     
    1. test_collection.printSchema()  
    2. test_collection.first()  

    上面的这种方式加载时间较长,因为spark需要判断各个字段的类型,需要抽取部分数据判断(或者扫描整个表,我没有具体的研究过,总之比较慢)。而且这种方式会将所有的数据加载进来,有些字段我并不需要获取到。可以用下面的代码改进:
    [python] view plain copy
     
    1. fields_list = "name age sex grade exp"  
    2. fields = [StructField(field_name, StringType(), True) for field_name in fields_list.split()]  
    3. schema = StructType(fields)  
    4. test_collection = job_ctx.read.schema(schema).format("com.mongodb.spark.sql").options(uri="mongodb://192.168.0.1:27017", database="test_db", collection="test_collection").load()  
    首先选择了需要加载的字段(字段的顺序可以随意排列,也可以加上mongodb中不存在的字段),然后指定这些字段的类型,上面代码都指定为了spark中的stringtype类型。

    test_collection变量是spark中的dataframe类型数据,就可以使用spark  sql处理数据:

    [python] view plain copy
     
    1. test_collection.registerTempTable("Account")  
    2. sql = "select * from Account where age > '18'"  
    3. result = ctx.sql(sql)  
    result也是dataframe类型数据,也可以继续像上面这样的处理方式处理。
    如果有的需求不能通过spark sql实现,也可以使用下面代码转化成rdd格式数据,然后用map、reduce等函数处理。
    [python] view plain copy
     
    1. test_rdd = test_collection.rdd  

    在数据通过sql或者自己写map、reduce函数处理完之后需要将数据保存到mongodb中。如果数据是dataframe格式:
    [python] view plain copy
     
    1. result.write.format("com.mongodb.spark.sql").mode("overwrite").options(uri="mongodb://192.168.0.1:27017", database="test_db", collection="test_collection_out").load()  

    其中的mode中填写写数据的模式,官方的连接器提供了四种模式:overwrite ignore  errorifexists append,本来以为这些模式是针对行级别的,后来看了源代码发现是针对collection级别的,官方代码如下:
    [plain] view plain copy
     
    1. override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {  
    2.     val writeConfig = WriteConfig(sqlContext.sparkContext.getConf, parameters)  
    3.     val mongoConnector = MongoConnector(writeConfig.asOptions)  
    4.     lazy val collectionExists: Boolean = mongoConnector.withDatabaseDo(  
    5.       writeConfig, { db => db.listCollectionNames().asScala.toList.contains(writeConfig.collectionName) }  
    6.     )  
    7.     mode match {  
    8.       case Append => MongoSpark.save(data, writeConfig)  
    9.       case Overwrite =>  
    10.         mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[Document] => collection.drop() })  
    11.         MongoSpark.save(data, writeConfig)  
    12.       case ErrorIfExists =>  
    13.         if (collectionExists) {  
    14.           throw new UnsupportedOperationException("MongoCollection already exists")  
    15.         } else {  
    16.           MongoSpark.save(data, writeConfig)  
    17.         }  
    18.       case Ignore =>  
    19.         if (!collectionExists) {  
    20.           MongoSpark.save(data, writeConfig)  
    21.         }  
    22.     }  
    23.     createRelation(sqlContext, parameters ++ writeConfig.asOptions, Some(data.schema))  
    24.   }  

    代码链接:https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/sql/DefaultSource.scala

    从代码中可以看出,overwrite就是先删除mongodb中指定的表,然后把数据写到这个表中;ignore就是如果mongodb中有这个表,就不写数据了,且不会报错;errorifexists就是如果mongodb中存在这个表就报错,如果不存在就正常写入;append就是不管mongodb中这个表存不存在直接往里写数据。分这么多模式其实没啥大用,如果不看代码容易理解出现偏差。

    如果是rdd类型的数据就需要先转化成dataframe格式再保存到mongodb中,例如:

    [python] view plain copy
     
    1. fields_list = "name age sex grade exp"  
    2. fields = [StructField(field_name, StringType(), True) for field_name in fields_list]  
    3. schema = StructType(fields)  
    4. df = ctx.createDataFrame(result, schema=schema)  
    5. df.write.format("com.mongodb.spark.sql").mode("overwrite").options(uri="mongodb://192.168.0.1:27017", database="test_db", collection="test_collection_out").load()  

    2、第三方连接器

    github地址:https://github.com/Stratio/Spark-MongoDB

    这个连接器的使用方式基本上和官方连接器一样,获取数据的代码如下:

    [python] view plain copy
     
    1. test_collection = ctx.read.schema(schema).format("com.stratio.datasource.mongodb").options(host="192.168.0.1:27017", database="test", collection="test_collection").load()  
    可以看出,只是format做了改变,options中的uri改成了host。

    写数据到mongodb的方法和官方连接器也差不多:

    [python] view plain copy
     
    1. result.write.format("com.stratio.datasource.mongodb").mode("append").options(host="192.168.0.1:27017", database="test", collection="test_collection_out", updateFields='name').save()  
    这个第三方连接器再写入数据到mongodb时的功能比官方连接器要好很多,它也提供了overwrite ignore  errorifexists append这四种模式,可以通过下面这些连接器的源代码看出它的处理方式:
    [plain] view plain copy
     
    1. mode match{  
    2.       case Append         => mongodbRelation.insert(data, overwrite = false)  
    3.       case Overwrite      => mongodbRelation.insert(data, overwrite = true)  
    4.       case ErrorIfExists  => if(mongodbRelation.isEmptyCollection) mongodbRelation.insert(data, overwrite = false)  
    5.       else throw new UnsupportedOperationException("Writing in a non-empty collection.")  
    6.       case Ignore         => if(mongodbRelation.isEmptyCollection) mongodbRelation.insert(data, overwrite = false)  
    7.     }  
    [plain] view plain copy
     
    1. def insert(data: DataFrame, overwrite: Boolean): Unit = {  
    2.     if (overwrite) {  
    3.       usingMongoClient(MongodbClientFactory.getClient(config.hosts, config.credentials, config.sslOptions, config.clientOptions).clientConnection) { mongoClient =>  
    4.         dbCollection(mongoClient).dropCollection()  
    5.       }  
    6.     }  
    7.   
    8.     data.saveToMongodb(config)  
    9.   }  
    [plain] view plain copy
     
    1. def saveToMongodb(config: Config, batch: Boolean = true): Unit = {  
    2.     val schema = dataFrame.schema  
    3.     dataFrame.foreachPartition(it => {  
    4.       val writer =  
    5.         if (batch) new MongodbBatchWriter(config)  
    6.         else new MongodbSimpleWriter(config)  
    7.   
    8.       writer.saveWithPk(  
    9.         it.map(row => MongodbRowConverter.rowAsDBObject(row, schema)))  
    10.   
    11.     })  
    12.   }  
    [plain] view plain copy
     
    1. private[mongodb] class MongodbBatchWriter(config: Config) extends MongodbWriter(config) {  
    2.   
    3.   private val IdKey = "_id"  
    4.   
    5.   private val bulkBatchSize = config.getOrElse[Int](MongodbConfig.BulkBatchSize, MongodbConfig.DefaultBulkBatchSize)  
    6.   
    7.   private val pkConfig: Option[Array[String]] = config.get[Array[String]](MongodbConfig.UpdateFields)  
    8.   
    9.   override def save(it: Iterator[DBObject], mongoClient: MongoClient): Unit = {  
    10.     it.grouped(bulkBatchSize).foreach { group =>  
    11.       val bulkOperation = dbCollection(mongoClient).initializeUnorderedBulkOperation  
    12.       group.foreach { element =>  
    13.         val query = getUpdateQuery(element)  
    14.         if (query.isEmpty) bulkOperation.insert(element)  
    15.         else bulkOperation.find(query).upsert().replaceOne(element)  
    16.       }  
    17.   
    18.       bulkOperation.execute(writeConcern)  
    19.     }  
    20.   }  

    可以看出,这四种模式的功能总体上是和官方连接器一致的,但是在写入mongodb时处理方式不同。在这里可以使用一个updateFields参数,这个参数表示以这个参数的值为key,如果查询到mongodb的表中已经有相同key的数据,会替换掉这行数据,如果没有则直接写入。在很多业务场景下,只需要在原表的基础上更新一部分数据,用这个第三方连接器就非常方便。

    3、连接器的选择

    在获取数据时,官方连接器的性能似乎比第三方连接器的好一点,官方连接器有一个条件下推的原则。我们知道spark的算子分为两种:Transformation和Action,只有遇到Action算子才会触发作业的提交。比如在后续的一些Transformation算子中对数据有一定的数据过滤条件,官方连接器会把过滤条件下推到MongoDB去执行,这样可以保证从MongoDB取出来、经过网络传输到Spark计算节点的数据确实都是用得着的。第三方连接器似乎会把所有数据加载到spark后再过滤(没有仔细求证)。

    在写数据到mongodb时,通过上面贴出来的代码可以看出,第三方连接器的功能比官方连接器的要好一点,支持在原有表的基础上做更新。

    4、连接器的使用

    首先要安装spark(如果不需要把数据保存到hdfs、不需要使用yarn,可以不安装hadoop),在spark目录下的bin目录下会有一个spark-submit可执行文件。

    例如把代码保存在test.py中,如果使用官方连接器,运行:

    [plain] view plain copy
     
    1. spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0 test.py  

    如果使用的是第三方连接器,运行:
    [plain] view plain copy
     
    1. spark-submit --packages com.stratio.datasource:spark-mongodb_2.10:0.11.2   test.py  
    第一次执行时会下载相应的工具包,国内网络可能下载不下来,我上传到了csdn,可以到下面连接下载:

    官方:http://download.csdn.net/detail/hellousb2010/9698375

    第三方:http://download.csdn.net/detail/hellousb2010/9698372

    解压后将cache、jar目录拷贝到~/.ivy2目录下即可。

  • 相关阅读:
    005.hive分组求中位数--sort_array(collect_list(),insert overwrite table --- 实现字段update操作
    004.hive 里的向上取整、向下取整、四舍五入取整
    003.hive选择tableName表中除了name、id、pwd之外的所有字段
    003.flink-flink集群安装HA
    002.flink打jar包时只打包代码,依赖jar包批量指定
    001.如何通过进程号查看进程所依赖的jar包,cd /proc/92232/fd
    002.hive-UDF自定义函数
    02.spark-spark分布式安装
    052.PGSQL-search_path参数 修改了postgresql.conf没有生效 ,应该修改这个配置postgresql.auto.conf文件,postgresql.auto.conf修改系统文件。配置是否被成功应用 pg_file_settings
    devexpress gridview样式设置
  • 原文地址:https://www.cnblogs.com/hejianxin/p/8066760.html
Copyright © 2011-2022 走看看