参考链接
- MongoDB Connector for Spark官方文档
- Mongo Spark 源码
- 原文:https://www.jianshu.com/p/dbac491317cc
场景:适用于数据清洗,如只需要部分字段:
依赖:
<dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.2</version> </dependency>
代码:
package com.edurt.ssi import com.mongodb.spark._ import org.apache.spark.{SparkConf, SparkContext} import org.bson._ object MongoSparkTest { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[4]") .setAppName("Mingdao-Score") .set("spark.mongodb.input.uri", "mongodb://192.168.18.129:27017/swift.booking") .set("spark.mongodb.output.uri", "mongodb://192.168.18.129:27017/outputDB.collectionName") // import com.mongodb.spark.config._ // val readConfig = ReadConfig(Map("collection" -> "employee", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc))) //同时还支持mongo驱动的readPreference配置, 可以只从secondary读取数据 // .set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName") // .set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName") val sc = new SparkContext(conf) // 创建rdd // val customRdd = MongoSpark.load(sc, readConfig) val originRDD = MongoSpark.load(sc) // 构造查询 // val dateQuery = new BsonDocument() // .append("$gte", new BsonDateTime(start.getTime)) // .append("$lt", new BsonDateTime(end.getTime)) val matchQuery = new Document("$match", BsonDocument.parse("{"businessType":"B2B"}")) // 构造Projection val projection1 = new BsonDocument("$project", BsonDocument.parse("{"businessType":"$businessType","bookingNo":"$bookingNo","status":"$status"}")) val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1)) //比如分组 val rdd1 = aggregatedRDD.keyBy(x=>{ Map( "businessType" -> x.get("businessType") ) }) // val rdd2 = rdd1.groupByKey.map(t=>{ // (t._1, t._2.map(x => { // x.getString("message").length // }).sum) // }) rdd1.collect().foreach(x=>{ println(x) }) //保持统计结果至MongoDB outputurl 所指定的数据库 MongoSpark.save(aggregatedRDD) } }