scala 连接mongoClient(分片式集群)
import com.df.Contant.GlobalConfigUtils import com.mongodb.{MongoClient, MongoClientURI} import org.apache.spark.sql.DataFrame /** * 连接分片式mongo集群 * 先连接mongoclient 进行删除历史表 * 再将新数据写入 */ object WriteMongoCluster { /** * 落地数据到mongo * */ def write(df:DataFrame , collectionName:String): Unit ={ // uri示例 val outputUri="mongodb://root:password@ip1:27017,ip2:27017,ip3:27017/admin" // 注意:uri末尾的admin是关键词。没有连接会报错 val url="mongodb://"+ GlobalConfigUtils.mongoUserName+ ":"+GlobalConfigUtils.mongoPasswd+ "@"+GlobalConfigUtils.mongoHost+ ":"+GlobalConfigUtils.mongoPort+ "," +GlobalConfigUtils.mongoHost2+ ":"+GlobalConfigUtils.mongoPort+ ","+GlobalConfigUtils.mongoHost3+ ":"+GlobalConfigUtils.mongoPort+ "/admin" /** * 需要注意的是:MongoClientURI、MongoClient 是org.mongodb.{MongoClientURI , MongoClient} * */ // 配置MongoClientURI val urls:MongoClientURI = new MongoClientURI(url) // 连接mongoclient val mongoClient = new MongoClient(urls) // 获取数据库 val db = mongoClient.getDatabase(GlobalConfigUtils.mongoDBname) // 获取集合 val collection = db.getCollection(collectionName) // 删除集合 collection.drop() // 分片式集群URI ,通过配置文件传入参数 val outputUri: String = s"mongodb://" + s"${GlobalConfigUtils.mongoUserName}:${GlobalConfigUtils.mongoPasswd}@${GlobalConfigUtils.mongoHost}:${GlobalConfigUtils.mongoPort},${GlobalConfigUtils.mongoHost2}:${GlobalConfigUtils.mongoPort},${GlobalConfigUtils.mongoHost3}:${GlobalConfigUtils.mongoPort}/${GlobalConfigUtils.mongoDBname}.${collectionName}" // 数据落地mongo // 需要注意Map中的uri是需要collectionName, 也就是mongo中的集合名 df.write.options(Map("spark.mongodb.output.uri"-> outputUri)) .mode("append") .format("com.mongodb.spark.sql") .save() } }