zoukankan      html  css  js  c++  java
  • Scala2.11.8 spark2.3.1 mongodb connector 2.3.0

    import java.sql.DriverManager
    import com.mongodb.spark._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    object mongospark20180830consume_amount {
    
      // 关于记录多个相同字段的处理方法 https://blog.csdn.net/qq_14950717/article/details/62425563
      // https://blog.csdn.net/qq_27234661/article/details/78344435?locationNum=3&fps=1
      def main(args: Array[String]): Unit = {
    
       // val mgohost = "dds-m5e6e56a3b0cf7b42784-pub.mongodb.rds.aliyuncs.com"
       // spark-submit --driver-class-path /usr/local/jdk/lib/mysql-connector-java-5.1.46.jar   --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 --class  "mongospark20180830consume_amount"  /testdata/u3.jar
    
    
        //  "org.mongodb.spark" %% "mongo-spark-connector" % "2.3.0",
      //  val conn = DriverManager.getConnection(url)
        val conf = new SparkConf().setAppName("appName").setMaster("local")
        val sparkConf = new SparkConf().setAppName("adver").setMaster("local[*]")
        val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    
        val inputUri="mongodb://saas:saas2018yundinglixin@dds-m5e6e56a3b0cf7b42784-pub.mongodb.rds.aliyuncs.com:3717/saas.elemeterPowerHistory"
    
        val df=spark.read.format("com.mongodb.spark.sql").options(
          Map("spark.mongodb.input.uri" -> inputUri,
            "spark.mongodb.input.partitioner" -> "MongoPaginateBySizePartitioner",
            "spark.mongodb.input.partitionerOptions.partitionKey"  -> "_id",
            "spark.mongodb.input.partitionerOptions.partitionSizeMB"-> "32"))
          .load()
    
        val currentTimestamp = System.currentTimeMillis()
        val df2 = df.select("time".toString,"uuid".toString,"consume_amount".toString,"room_id".toString)
          .toDF("time","uuid","consume_amount","room_id")
    
    
        spark.sql("use saas")
        df2.write.mode("overwrite").saveAsTable("consume_amount20180831")
    //       df2.foreach(println)
    //
    
    //    val rddf=spark.sql( "select uuid,from_unixtime(cast(`time`/1000 AS bigint),'yyyyMMddHH'),consume_amount from consume where time>=1533115788000").toDF("uuid", "time","consume_amount")
    //
      //  spark.sql("use saas")
    //
    //    rddf.write.saveAsTable("consume_amount20180830")
    //   // val p=df.printSchema()
    //
    //   // val select=spark.sql("select s.sn,s.uuid,e.time,e.consume_amount from staonly2  s join elem e on s.uuid=e.uuid").take(10)
    //    val select=spark.sql("select consume_amount from elem limit 5").take(5)
    //     select.foreach(println)
    
    
    
    
      }
    }
  • 相关阅读:
    Linux Kernel Makefiles Kbuild en
    Android 源码结构分析
    Linux Charger IC 驱动移植总结
    Battery Charging Specification Revision 1.2 中文版本
    OpenCV 3.4.2 Windows系统下的环境搭建(附带opencv_contrib-3.4.2)
    OpenCV 经纬法将鱼眼图像展开
    shell 循环结构
    OpenCV之Mat类使用总结
    shell 条件结构之 if 语句使用总结
    OpenCV Error: Unspecified Error(The Function is not implemented)
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/5691865.html
Copyright © 2011-2022 走看看