zoukankan      html  css  js  c++  java
  • spark-mongo(1 读写数据)

    参考链接

    场景:适用于数据清洗,如只需要部分字段:

    依赖:

        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>

    代码:

    package com.edurt.ssi
    import com.mongodb.spark._
    import org.apache.spark.{SparkConf, SparkContext}
    import org.bson._
    object MongoSparkTest {
    
      def main(args: Array[String]): Unit =
      {
    
    
        val conf = new SparkConf()
          .setMaster("local[4]")
          .setAppName("Mingdao-Score")
          .set("spark.mongodb.input.uri", "mongodb://192.168.18.129:27017/swift.booking")
          .set("spark.mongodb.output.uri", "mongodb://192.168.18.129:27017/outputDB.collectionName")
    //    import com.mongodb.spark.config._
    //    val readConfig = ReadConfig(Map("collection" -> "employee", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
        //同时还支持mongo驱动的readPreference配置, 可以只从secondary读取数据
        //      .set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")
        //      .set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")
    
        val sc = new SparkContext(conf)
        // 创建rdd
        //    val customRdd = MongoSpark.load(sc, readConfig)
        val originRDD = MongoSpark.load(sc)
    
        // 构造查询
        //    val dateQuery = new BsonDocument()
        //      .append("$gte", new BsonDateTime(start.getTime))
        //      .append("$lt", new BsonDateTime(end.getTime))
        val matchQuery = new Document("$match", BsonDocument.parse("{"businessType":"B2B"}"))
    
        // 构造Projection
        val projection1 = new BsonDocument("$project", BsonDocument.parse("{"businessType":"$businessType","bookingNo":"$bookingNo","status":"$status"}"))
        val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))
    
        //比如分组
        val rdd1 = aggregatedRDD.keyBy(x=>{
          Map(
            "businessType" -> x.get("businessType")
          )
        })
    
        //    val rdd2 = rdd1.groupByKey.map(t=>{
        //      (t._1, t._2.map(x => {
        //        x.getString("message").length
        //      }).sum)
        //    })
    
        rdd1.collect().foreach(x=>{
          println(x)
        })
    
        //保持统计结果至MongoDB outputurl 所指定的数据库
        MongoSpark.save(aggregatedRDD)
    
    
      }
    
    }
  • 相关阅读:
    form表单为什么不能提交
    遇到了消息堆积,但是问题不大
    面试题:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?
    Dubbo学习地址
    Dubbo入门到实战2
    Dubbo入门到实战
    Mybatis 的三种执行器
    从源码理解Druid连接池原理
    Getting NoSuchMethodError:javax.servlet.ServletContext.getVirtualServerName()
    解决问题:org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)
  • 原文地址:https://www.cnblogs.com/lshan/p/13631648.html
Copyright © 2011-2022 走看看