zoukankan      html  css  js  c++  java
  • spark 广播变量简单使用和oracle进行数据关联取关联表的个别字段

    1.全部代码不再粘贴,只粘贴主要代码。spark streaming主方法代码

        //广播oracle关联表
        var sql = "SELECT * FROM ***_INFO"
        var oracleOperation: DataBaseOperation = new DataBaseOperation()
        var connection: Connection = oracleOperation.getConnectionMethod()
        var mapValue: Map[String, String] = oracleOperation.scan(connection, sql)
        var oracleValue: Broadcast[Map[String, String]] = ssc.sparkContext.broadcast(mapValue)

    主方法入hbase(和上面的代码片段在一个文件里)

          var MACHNO = Integer.parseInt(r.toString.substring(6, 12), 16)
          var MACHNO3=MACHNO.toString
          var fianlMap: Map[String, String] = Relation.relationOracle(MACHNO3, oracleValue.value)
          var  REGTIME= fianlMap.apply("REG_TIME") //注册时间
          var LINENO=fianlMap.apply("LINE_NO")//
          var BUSNO= fianlMap.apply("BUS_NO") //

    2.DataBaseOperation类

    import java.sql.{Connection, DriverManager, ResultSet}
    
    class DataBaseOperation {
      def scan(connection: Connection,scanSql:String):Map[String,String]={
        var mapvalue:Map[String,String] = Map()
        var pst=connection.prepareStatement(scanSql)
        var rs: ResultSet = pst.executeQuery()
        while (rs.next()){
          var machNo = rs.getString("MACH_NO")
          var regTime = rs.getString("REG_TIME")
          var lineNo = rs.getString("LINE_NO")
          var busNo = rs.getString("BUS_NO")
          var value = regTime.substring(0,19)+","+lineNo+","+busNo
          mapvalue += (machNo -> value)
        }
        (mapvalue)
      }
    
        def getConnectionMethod(): Connection = {
          val url = "jdbc:oracle:thin:@192.**/***"
          // 驱动名称
          val driver = "oracle.jdbc.driver.OracleDriver"
          // 用户名
          val username = "***"
          // 密码
          val password = "***"
          // 初始化数据连接
          var connection: Connection = null
          try {
            Class.forName(driver)
            println("连接数据库") // 连接数据库
            connection = DriverManager.getConnection(url, username, password)
            println("==== 广播表")
          } catch {
            case e: Exception => println("数据库连接失败" + e.printStackTrace)
          }
          connection
        }
    }

    另一个对象文件

    object Relation {
      def relationOracle(MACHNO: String,oracleMap:Map[String,String]):Map[String,String] ={
        var reseiveData: Map[String,String]= oracleMap
        //      if (kafkaMap("MACH_NO").contains()){   //todo oracleMap 拿到Oracle这个表里所有的值
        //        if (oracleMap(kafkaMap("MACH_NO")).contains()){
        var dataValue: String = oracleMap(MACHNO).toString
        var dataArray: Array[String] = dataValue.split(",")
        reseiveData += ("REG_TIME" -> dataArray(0))
        reseiveData += ("LINE_NO" -> dataArray(1))
        reseiveData += ("BUS_NO" -> dataArray(2))
        //        }
        //      }
        //      println(reseiveData("REG_TIME"))
        reseiveData
      }
    }

    上面是传一个machno作为数据的关联。只获取了一个字符串,下面另一个案例是传一个Map

    1.主函数

     var value: Map[String, String] = PackageEntityNine.analysisString(lineValue, 1)
     var fianlMap: Map[String, String] = Relation.relationOracle(value, oracleValue.value)
     var put = HbaseEnter.DepositHbase(fianlMap)
     table.setAutoFlush(false, false)
     table.setWriteBufferSize(1 * 8 * 8)
     table.put(put)

    2.主要关联的函数(另一个文件),其他的都相同

    object Relation {     //不同处
        def relationOracle(kafkaMap: Map[String,String],oracleMap:Map[String,String]):Map[String,String] ={
          var reseiveData: Map[String, String] = kafkaMap
              var dataValue: String = oracleMap(kafkaMap("MACH_NO")).toString
              var dataArray: Array[String] = dataValue.split(",")
              reseiveData += ("REG_TIME" -> dataArray(0))
              reseiveData += ("LINE_NO" -> dataArray(1))
              reseiveData += ("BUS_NO" -> dataArray(2))
          reseiveData
        }
    }
  • 相关阅读:
    阿里云-Redis-Help-最佳实战:将MySQL数据迁移到Redis
    阿里云-Redis-实战场景:互联网类应用
    阿里云-Redis-实战场景:电商行业类应用
    术语-计算机-性能:RT
    术语-计算机-性能:并发数
    术语-计算机-性能:TPS
    术语-计算机-性能:QPS
    阿里云-Redis-实战场景:游戏服务类应用
    阿里云-Redis-实战场景:视频直播类应用
    文章-依赖注入:《Inversion of Control Containers and the Dependency Injection pattern》
  • 原文地址:https://www.cnblogs.com/kaiwen03/p/9959747.html
Copyright © 2011-2022 走看看