zoukankan      html  css  js  c++  java
  • 通过spark sql 将 hdfs上文件导入到mongodb

    功能:通过spark sql 将hdfs 中文件导入到mongdo

     所需jar包有:mongo-spark-connector_2.11-2.1.2.jar、mongo-java-driver-3.8.0.jar

     scala代码如下: 

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.Dataset
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.hadoop.conf.Configuration
    import org.apache.spark.sql.SparkSession
    import com.mongodb.spark._
    import org.bson.Document
    import com.mongodb.spark.config._

    object Exec {
    def main(args: Array[String]) {

    if (args.length < 6) {
    System.err.println("Usage: Exec <hdfsServer> <logPath> <fileName> <mongoHost> <mongoDB> <mongoCollection>")
    System.exit(1)
    }
    val hdfsServer = args(0) // "hdfs://master"
    val logPath = args(1) // "/user/hdfs/log/"
    val fileName = args(2) // 2017-05-04.txt
    val mongoHost = args(3) // "10.15.22.22:23000"
    val mongoDB = args(4) // "mongo db"
    val mongoCollection = args(5) //"mongo collection"

    try {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
    .builder()
    .master("local")
    .appName("SparkImportDataToMongo")
    .config("spark.debug.maxToStringFields", 500).getOrCreate()
    import spark.implicits._
    val df = spark.read.json(hdfsServer + logPath + "/" + fileName)
    df.printSchema()
    df.write.mode("append").format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://" + mongoHost + "/" + mongoDB + "." + mongoCollection).save()


    } catch {
    case ex: Exception => {
    printf(ex.toString())
    }
    }
    }
    }

    在spark 运行目录执行如下命令:

    ./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test


    运行:

    [root@master spark-2.1.1-bin-hadoop2.6]#   ./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test
    18/07/20 23:41:13 INFO spark.SparkContext: Running Spark version 2.1.1
    18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls to: root
    18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls to: root
    18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls groups to: 
    18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls groups to: 
    18/07/20 23:41:14 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    18/07/20 23:41:14 INFO util.Utils: Successfully started service 'sparkDriver' on port 24073.
    18/07/20 23:41:14 INFO spark.SparkEnv: Registering MapOutputTracker
    18/07/20 23:41:14 INFO spark.SparkEnv: Registering BlockManagerMaster
    18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    18/07/20 23:41:14 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9c42a710-559b-4c97-b92a-58208a77afeb
    18/07/20 23:41:14 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
    18/07/20 23:41:14 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    18/07/20 23:41:14 INFO util.log: Logging initialized @1777ms
    18/07/20 23:41:14 INFO server.Server: jetty-9.2.z-SNAPSHOT
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c65a5ef{/jobs,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6b5176f2{/jobs/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b672aa8{/jobs/job,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2fab4aff{/jobs/job/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@ec0c838{/stages,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6e46d9f4{/stages/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5cc69cfe{/stages/stage,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@29cfd92b{/stages/stage/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@21c64522{/stages/pool,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7997b197{/stages/pool/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11dee337{/storage,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@460f76a6{/storage/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@55f3c410{/storage/rdd,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11acdc30{/storage/rdd/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@770d4269{/environment,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4a8ab068{/environment/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1922e6d{/executors,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@76a82f33{/executors/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6bab2585{/executors/threadDump,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74bdc168{/executors/threadDump/json,null,AVAILABLE,@Spark}
    18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@644c78d4{/static,null,AVAILABLE,@Spark}
  • 相关阅读:
    进程与线程的一个简单解释
    如何更优雅的写出你的SQL语句
    SQL 性能优化梳理
    如何写出让同事无法维护的代码?
    Linux配置IP常用命令
    Linux中防火墙命令笔记
    蓝牙技术的工作原理及用途
    别死写代码,这 25 条比涨工资都重要
    搞清这些陷阱,NULL和三值逻辑再也不会作妖
    计算机网络:TCP和UDP的对比
  • 原文地址:https://www.cnblogs.com/abcdwxc/p/9344637.html
Copyright © 2011-2022 走看看