zoukankan      html  css  js  c++  java
  • 小记--------scala连接分片式集群Mongo数据库:操作集合以及写入DataFrame数据

    scala 连接mongoClient(分片式集群)

    import com.df.Contant.GlobalConfigUtils
    import com.mongodb.{MongoClient, MongoClientURI}
    import org.apache.spark.sql.DataFrame
    ​
    /**
      * 连接分片式mongo集群
      * 先连接mongoclient 进行删除历史表
      * 再将新数据写入
      */
    object WriteMongoCluster {
    ​
    ​
      /**
        * 落地数据到mongo
        * */
      def write(df:DataFrame , collectionName:String): Unit ={      
          
    // uri示例 val outputUri="mongodb://root:password@ip1:27017,ip2:27017,ip3:27017/admin"
      // 注意:uri末尾的admin是关键词。没有连接会报错    
    val url="mongodb://"+
      GlobalConfigUtils.mongoUserName+
      ":"+GlobalConfigUtils.mongoPasswd+
      "@"+GlobalConfigUtils.mongoHost+
      ":"+GlobalConfigUtils.mongoPort+
      "," +GlobalConfigUtils.mongoHost2+
      ":"+GlobalConfigUtils.mongoPort+
      ","+GlobalConfigUtils.mongoHost3+
      ":"+GlobalConfigUtils.mongoPort+
      "/admin"
       
       /**
         * 需要注意的是:MongoClientURI、MongoClient 是org.mongodb.{MongoClientURI , MongoClient}
         * */
          // 配置MongoClientURI
        val urls:MongoClientURI = new MongoClientURI(url)
          // 连接mongoclient
        val mongoClient = new MongoClient(urls)
          // 获取数据库
        val db = mongoClient.getDatabase(GlobalConfigUtils.mongoDBname)
          // 获取集合
        val collection = db.getCollection(collectionName)
          // 删除集合
        collection.drop()
    ​
    // 分片式集群URI ,通过配置文件传入参数
    val outputUri: String = s"mongodb://" +
      s"${GlobalConfigUtils.mongoUserName}:${GlobalConfigUtils.mongoPasswd}@${GlobalConfigUtils.mongoHost}:${GlobalConfigUtils.mongoPort},${GlobalConfigUtils.mongoHost2}:${GlobalConfigUtils.mongoPort},${GlobalConfigUtils.mongoHost3}:${GlobalConfigUtils.mongoPort}/${GlobalConfigUtils.mongoDBname}.${collectionName}"
          
        // 数据落地mongo
          // 需要注意Map中的uri是需要collectionName, 也就是mongo中的集合名
        df.write.options(Map("spark.mongodb.output.uri"-> outputUri))
          .mode("append")
          .format("com.mongodb.spark.sql")
          .save()
      }
    }

     

     

  • 相关阅读:
    Http 请求处理流程
    ASP.NET 4.0: 请求验证模式变化导致ValidateRequest=false失效
    Android Animation学习笔记【转载】
    Http Handler 介绍
    对路径XXX的访问被拒绝(文件操作权限)的解决方法
    控制页面滚动条
    Sqlserver查询字段默认值
    Ajax基础
    浮动&定位
    定时器
  • 原文地址:https://www.cnblogs.com/yzqyxq/p/12786823.html
Copyright © 2011-2022 走看看