1.全部代码不再粘贴,只粘贴主要代码。spark streaming主方法代码
//广播oracle关联表 var sql = "SELECT * FROM ***_INFO" var oracleOperation: DataBaseOperation = new DataBaseOperation() var connection: Connection = oracleOperation.getConnectionMethod() var mapValue: Map[String, String] = oracleOperation.scan(connection, sql) var oracleValue: Broadcast[Map[String, String]] = ssc.sparkContext.broadcast(mapValue)
主方法入hbase(和上面的代码片段在一个文件里)
var MACHNO = Integer.parseInt(r.toString.substring(6, 12), 16) var MACHNO3=MACHNO.toString var fianlMap: Map[String, String] = Relation.relationOracle(MACHNO3, oracleValue.value) var REGTIME= fianlMap.apply("REG_TIME") //注册时间 var LINENO=fianlMap.apply("LINE_NO")// var BUSNO= fianlMap.apply("BUS_NO") //
2.DataBaseOperation类
import java.sql.{Connection, DriverManager, ResultSet} class DataBaseOperation { def scan(connection: Connection,scanSql:String):Map[String,String]={ var mapvalue:Map[String,String] = Map() var pst=connection.prepareStatement(scanSql) var rs: ResultSet = pst.executeQuery() while (rs.next()){ var machNo = rs.getString("MACH_NO") var regTime = rs.getString("REG_TIME") var lineNo = rs.getString("LINE_NO") var busNo = rs.getString("BUS_NO") var value = regTime.substring(0,19)+","+lineNo+","+busNo mapvalue += (machNo -> value) } (mapvalue) } def getConnectionMethod(): Connection = { val url = "jdbc:oracle:thin:@192.**/***" // 驱动名称 val driver = "oracle.jdbc.driver.OracleDriver" // 用户名 val username = "***" // 密码 val password = "***" // 初始化数据连接 var connection: Connection = null try { Class.forName(driver) println("连接数据库") // 连接数据库 connection = DriverManager.getConnection(url, username, password) println("==== 广播表") } catch { case e: Exception => println("数据库连接失败" + e.printStackTrace) } connection } }
另一个对象文件
object Relation { def relationOracle(MACHNO: String,oracleMap:Map[String,String]):Map[String,String] ={ var reseiveData: Map[String,String]= oracleMap // if (kafkaMap("MACH_NO").contains()){ //todo oracleMap 拿到Oracle这个表里所有的值 // if (oracleMap(kafkaMap("MACH_NO")).contains()){ var dataValue: String = oracleMap(MACHNO).toString var dataArray: Array[String] = dataValue.split(",") reseiveData += ("REG_TIME" -> dataArray(0)) reseiveData += ("LINE_NO" -> dataArray(1)) reseiveData += ("BUS_NO" -> dataArray(2)) // } // } // println(reseiveData("REG_TIME")) reseiveData } }
上面是传一个machno作为数据的关联。只获取了一个字符串,下面另一个案例是传一个Map
1.主函数
var value: Map[String, String] = PackageEntityNine.analysisString(lineValue, 1) var fianlMap: Map[String, String] = Relation.relationOracle(value, oracleValue.value) var put = HbaseEnter.DepositHbase(fianlMap) table.setAutoFlush(false, false) table.setWriteBufferSize(1 * 8 * 8) table.put(put)
2.主要关联的函数(另一个文件),其他的都相同
object Relation { //不同处 def relationOracle(kafkaMap: Map[String,String],oracleMap:Map[String,String]):Map[String,String] ={ var reseiveData: Map[String, String] = kafkaMap var dataValue: String = oracleMap(kafkaMap("MACH_NO")).toString var dataArray: Array[String] = dataValue.split(",") reseiveData += ("REG_TIME" -> dataArray(0)) reseiveData += ("LINE_NO" -> dataArray(1)) reseiveData += ("BUS_NO" -> dataArray(2)) reseiveData } }