zoukankan      html  css  js  c++  java
  • 大数据中各种框架的连接器(Spark, Flink, MongoDB, Kafka, Hive, Hbase等)

    本文不支持复制粘贴的转载,鼓励修改和扩展后的转载。转载前必须邮件取得本人同意,联系方式:1505514388@qq.com

    各种语言和框架连接mongodb的基础代码

    使用Java客户端连接mongodb

            //获取mongo客户端
            MongoClient mongoClient=new MongoClient("localhost");
            //获取数据库
            MongoDatabase database=mongoClient.getDatabase("test_database");
            //获取集合
            MongoCollection<Document>collection=database.getCollection("test_database");
            //查询集合中的第一个元素
            Document myDoc=collection.find().first();
            System.out.println(myDoc);
            //关闭资源
            mongoClient.close();
    

    使用scala客户端连接mongodb

        val mongoClient = MongoClient("mongodb://localhost:27017")
        val database = mongoClient.getDatabase("test_basedata")
        val collection = database.getCollection("test_basedata")
        val document = collection.find().first()
        System.out.println(document.toString)
        mongoClient.close()
    

    使用spark连接mongodb

    在单纯的使用scala的mongodb的连接器时遇到了以下报错:

    com.mongodb.ConnectionString.getThreadsAllowedToBlockForConnectionMultiplier()Ljava/lang/Integer;
    

    解决办法为:

    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.10.0</version>
    </dependency>
    

    具体原因未知
    spark的连接器的代码如下:

        //TODO 开启环境
        val spark = SparkSession.builder().master("local")
          .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test_database.test_database")
          .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test_database.test_database")
          .getOrCreate()
       //TODO 数据操作
        val testDF = MongoSpark.load(spark)
        testDF.show(20)
        //TODO 关闭环境
        spark.close()
    

    使用flink将mongodb作为数据源

    在flink中没有将mongodb作为数据源的,所以下面使用的依赖也是第三方连接器。
    在一般情况下,也不会遇到将mongodb作为flink的数据源。
    所需要的依赖:

    <!-- https://mvnrepository.com/artifact/org.mongodb/casbah-core -->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.11</artifactId>
        <version>3.1.1</version>
    </dependency>
    

    下面是自定义的source

    package com.myFlink.test
    
    import com.mongodb.{BasicDBObject, MongoClientURI, casbah}
    import com.mongodb.casbah.{MongoClient, MongoClientURI}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    
    class MongodbSource extends RichSourceFunction[User]{
      //创建mongodb数据源时运行
      var client:MongoClient=_
      override def open(parameters: Configuration): Unit = {
        //连接本地mongodb
        client= MongoClient(casbah.MongoClientURI("mongodb://localhost:27017"))
      }
      //实时运行的函数
      override def run(sourceContext: SourceFunction.SourceContext[User]): Unit = {
        //TODO 获取数据库
        val database = client("test_database")
        //TODO 获取集合
        val coll = database("test_database")
        //TODO 取出数据
        val query = new BasicDBObject("date", "20171203")
        val cursorType = coll.find(query)
        if(cursorType.nonEmpty){
          val oneData = cursorType.next() //拿出一条数据
          sourceContext.collect(
            User(
              name=oneData.get("name").toString,
              date=oneData.get("date").toString
            )
          )
        }
      }
      //结束时的函数
      override def cancel(): Unit ={
        if(client!=null){
          client.close()
        }
      }
    }
    
    case class User(name:String,date:String)
    

    然后向本地的mongdb中插入数据:

    db.getCollection("test_database").insert({"name":"kone", "date":"20171203"})
    

    然后输出:

    4> User(kone,20171203)
    

    使用flink将mongodb作为sink

    package com.myFlink.test
    import com.mongodb.BasicDBObject
    import com.mongodb.casbah.{MongoClient, MongoClientURI}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    
    class MongoDBSink extends RichSinkFunction[User]{
      var client:MongoClient=_
      override def open(parameters: Configuration): Unit = {
        client=MongoClient(MongoClientURI("mongodb://localhost:27017"))
      }
    
      override def invoke(value: User): Unit = {
        //TODO 获取数据库
        val database = client("test_database")
        //TODO 获取集合
        val coll = database("test_database")
        //TODO 数据操作
        val obj = new BasicDBObject("name", value.name).append("date", value.date)
        coll.insert(obj)
      }
    
      override def close(): Unit = {
        if(client!=null){
          client.close()
        }
      }
    }
    

    还没完成。

  • 相关阅读:
    jumpserver-1.4.8安装步骤
    堡垒机使用普通用户密钥方式登陆资产,然后新创建普通用户管理资产
    使用python的subprocess模块调用linux系统命令
    MySQL数据表操作命令
    部署SonarQube代码检测服务并结合Jenkins使用
    使用Jenkins结合Gogs和SonarQube对项目代码进行测试、部署、回滚,以及使用keepalived+haproxy调度至后端tomcat
    Gogs官方帮助文档
    Centos7 用gogs搭建git仓库
    通过设置访问密码查看Tomcat服务器运行状态
    在CentOS 7系统下升级 Jenkins版本
  • 原文地址:https://www.cnblogs.com/ALINGMAOMAO/p/14799513.html
Copyright © 2011-2022 走看看