zoukankan      html  css  js  c++  java
  • spark streaming拉取kafka数据, 结合sparkSql dataframe hive存储计算,输出到mysql

    spark streaming拉取kafka数据, 结合sparkSql dataframe hive存储计算,输出到mysql.

    数据清洗过程比较复杂,没办法,上游给的屡一样的数据,正则去解析并全量按时间取最新一条去重。

    每天kafka数据5千万条。1分钟要刷一次,看上去还可以满足。只有屡一样去堆代码了。

    package biReportJob.streaming

    import java.io.InputStreamReader
    import java.sql.DriverManager
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Locale}

    import common.getEnv
    import io.thekraken.grok.api.Grok
    import org.apache.commons.lang3.StringUtils
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.internal.Logging
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.functions.lit
    import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
    import org.apache.spark.streaming.kafka010._
    import scripts.mongoEtls_zt.ZtLxjETLDailyPushOrder.dataExtractRegexStr
    import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
    import scala.util.parsing.json.JSON

    /**
    * create by roy 2019-08-15
    * spark streaming 消费kafka
    * 订单全量去重,取最新一条数据
    *biReportJob.streaming.LxjOrderStreaming
    */
    object LxjOrderStreaming extends Logging {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.streaming.kafka010.DirectKafkaInputDStream").setLevel(Level.WARN)
    Logger.getRootLogger.setLevel(Level.WARN)

    def main(args: Array[String]): Unit = {
    val Array(day_hh, streamingSecondsa) = (args ++ Array(null, null)).slice(0, 2)
    var bootstrap_servers = "192.168.7.14:9092,192.168.7.14:9093,192.168.7.15:9092"
    if ("local".equals(getEnv("env"))) {
    bootstrap_servers = "10.200.102.84:9092"
    }
    val spark = SparkSession
    .builder()
    .appName("kafka2HiveStreaming")
    .enableHiveSupport()
    .getOrCreate()
    var streamingSeconds = 60
    if (StringUtils.isNotBlank(streamingSecondsa)) {
    streamingSeconds = streamingSecondsa.toInt
    }
    val sparkContext: SparkContext = spark.sparkContext
    val ssc: StreamingContext = new StreamingContext(sparkContext, Seconds(streamingSeconds))
    val topics = Array("zt_log")
    val table_after = getEnv("env")
    /**
    * enable.auto.commit 数据处理之后提交,如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中
    *
    * earliest 那么任务会从最开始的offset读取数据,相当于重播所有数据。这样的设置会使得你的任务重启时将该topic中仍然存在的数据再读取一遍。这将由你的消息保存周期来决定你是否会重复消费。
    * latest 那么你的应用启动时会从最新的offset开始读取,这将导致你丢失数据。这将依赖于你的应用对数据的严格性和语义需求,这或许是个可行的方案。
    */
    var kafkaParams = Map[String, Object](
    "bootstrap.servers" -> bootstrap_servers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "g1_post_push_reduce_local",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    if ("999".equals(day_hh)) {
    kafkaParams = Map[String, Object](
    "bootstrap.servers" -> bootstrap_servers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "g1_post_push_reduce_local",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean))
    }

    val dateFormate: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.US)
    val cal = Calendar.getInstance
    val c_yymmdd = dateFormate.format(cal.getTime)
    println("env=" + getEnv("env") + " day_hh=" + day_hh + " cal.getTime.getTime=" + cal.getTime.getTime + " c_yymmdd=" + c_yymmdd + " bootstrap_servers=" + bootstrap_servers)
    val messagesDStream = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    // val lines = messages.map(_.value)
    messagesDStream.foreachRDD(rdd => {
    // 获取偏移量
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    import spark.implicits._
    val lines = rdd.map(_.value)
    val wordsDataFrame1 = lines.map(line => {
    val max_time: Long = Calendar.getInstance.getTime.getTime / 1000
    var jsonstr = ""
    try {
    val grok = SparkSessionSingleton.getGrokInstance()
    val m = grok.`match`(line)
    m.captures
    jsonstr = m.toJson()
    } catch {
    case e: java.lang.Exception => // Exception
    println("Exception" + e)
    }
    var rtLine = "msg_null"
    if (StringUtils.isNotBlank(jsonstr)) {
    // println(" m.toJson()=" + jsonstr)
    val rooMap = JSON.parseFull(jsonstr).get.asInstanceOf[Map[String, Any]]
    if (rooMap.contains("flag") && rooMap.contains("msg") && rooMap.contains("actionName") &&
    rooMap.getOrElse("flag", 0).asInstanceOf[Double].toInt == 1) {
    val msg = rooMap.getOrElse("msg", "").toString
    val actionName = rooMap.getOrElse("actionName", "").toString
    if (StringUtils.isNotBlank(msg) && StringUtils.isNotBlank(actionName)) {
    if ("candao.order.postOrder".equals(actionName) || "candao.order.pushOrder".equals(actionName)) {
    val jsonMsg = "{" + msg.replaceFirst(dataExtractRegexStr, "")
    val jsonMap = JSON.parseFull(jsonMsg).get.asInstanceOf[Map[String, Any]]
    // println("jsonMap==" + jsonMap)
    if (jsonMap.contains("data") && jsonMap.contains("data") != None && jsonMap.get("data").get.isInstanceOf[Map[String, Any]]) {
    val msgData = jsonMap.get("data").get.asInstanceOf[Map[String, Any]]
    val actionName = jsonMap.getOrElse("actionName", "")
    var orderMoney = 0.0
    var orderNo = ""
    var otype = -1
    var orderId = ""
    var orderDate = "20180102"
    if (actionName.equals("candao.order.postOrder")) {
    orderMoney = msgData.getOrElse("merchantPrice", 0.0).asInstanceOf[Double]
    otype = 1
    //extid,补上orderId,做统一去重用
    orderId = "1" + msgData.getOrElse("extId", "").toString
    // orderDate = msgData.getOrElse("orderDate", "20180101").toString
    orderDate = msgData.getOrElse("createTime", "20180101").toString.substring(0, 8)
    } else if (actionName.equals("candao.order.pushOrder")) {
    orderMoney = msgData.getOrElse("price", 0.0).asInstanceOf[Double] //price
    orderNo = msgData.getOrElse("orderNo", "").toString //price
    orderId = "2" + msgData.getOrElse("orderId", "").asInstanceOf[Double].toInt.toString
    orderDate = msgData.getOrElse("orderDate", "20180101").toString.substring(0, 10).replace("-", "")
    otype = 2
    }
    val extId = msgData.getOrElse("extId", "").toString
    val extOrderId = msgData.getOrElse("extOrderId", "").toString
    val extStoreId = msgData.getOrElse("extStoreId", "").toString
    val stype = msgData.getOrElse("type", -899.0).asInstanceOf[Double].toInt
    val payType = msgData.getOrElse("payType", -899.0).asInstanceOf[Double].toInt
    val orderStatus = msgData.getOrElse("orderStatus", -899.0).asInstanceOf[Double].toInt
    val createTime = msgData.getOrElse("createTime", 899).toString.toLong
    println("ok data extId=" + extId)
    rtLine = extId + "," + extOrderId + "," + extStoreId + "," + stype + "," + payType + "," + orderMoney + "," + orderStatus + "," + orderDate + "," + createTime + "," + otype + "," + orderNo + "," + orderId + "," + max_time
    }
    }
    }
    }
    }
    rtLine
    }).filter(row => row != null && !row.equals("msg_null"))
    .map(w => {
    val row_data = w.split(",")
    StreamingStoreEntity(row_data(0), row_data(1), row_data(2), row_data(3).toInt, row_data(4).toInt, row_data(5).toDouble, row_data(6).toInt, row_data(7), row_data(8).toLong, row_data(9).toInt, row_data(10), row_data(11), row_data(12).toLong)
    }).toDS()
    wordsDataFrame1.cache()
    println("wordsDataFrame1 show")
    wordsDataFrame1.show()
    if (!wordsDataFrame1.take(1).isEmpty) {
    wordsDataFrame1.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    wordsDataFrame1.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    if (!tableExists(s"zt.lxj_streaming_order_data_$table_after", wordsDataFrame1.sparkSession)) {
    println("table is not create table lxj_streaming_order_data_")
    wordsDataFrame1.withColumn("part_job", lit(c_yymmdd)).write.mode(SaveMode.Overwrite).partitionBy("part_job").saveAsTable(s"zt.lxj_streaming_order_data_$table_after")
    } else {
    wordsDataFrame1.withColumn("part_job", lit(c_yymmdd)).write.mode(SaveMode.Append).partitionBy("part_job").saveAsTable(s"zt.lxj_streaming_order_data_$table_after")
    }
    val yymmdd = c_yymmdd.replace("-", "")
    val DF1 = spark.sql(
    s"""
    |select t1.* from (
    |select *, row_number() over(partition by orderId ORDER BY createTime desc) rn
    |from zt.lxj_streaming_order_data_$table_after where part_job='$c_yymmdd' and orderDate='$yymmdd'
    |) t1 where t1.rn=1
    """.stripMargin)
    DF1.createOrReplaceTempView("table_tmp_1")
    val sql2 =
    s"""
    |select orderDate,extStoreId,otype,max(jobtime) jobtime,count(1) orderNum,round(sum(orderMoney),4) orderMoney from table_tmp_1
    |group by orderDate,extStoreId,otype
    """.stripMargin
    val DF2 = spark.sql(sql2)
    DF2.createOrReplaceTempView("result_left_city_tmp")
    spark.sqlContext.setConf("spark.sql.crossJoin.enabled", "true")
    val resultCityDF = spark.sql(
    """
    |select t1.*,t2.provinceid,t2.provincename,t2.cityid,t2.cityname,t2.storename from result_left_city_tmp t1
    |left join zt.lxj_store t2
    |on t1.extStoreId=t2.extstoreid
    """.stripMargin)
    resultCityDF.cache()
    // resultCityDF.checkpoint()
    if (!resultCityDF.take(1).isEmpty) {
    savedbresult(resultCityDF)
    } else {
    println("update time")
    updateTime()
    }
    } else {
    println("update time")
    updateTime()
    }
    // some time later, after outputs have completed
    messagesDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    // println(s"========= $time =========")
    })


    // wordsDStream.print()
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
    }


    def savedbresult(dataDF: DataFrame): Unit = {
    val (user, passwd, url) = common.LocalMysqlSettings("finereport.user", "finereport.passwd", "finereport.url", "finereportmysql.properties")
    println("mysql url==" + url)
    val insertSql =
    """
    |INSERT INTO lxj_order_theday_board_streaming_test (`provinceid`, `provincename`, `cityid`, `cityname`,`orderDate`, extStoreId,`orderNum`, `orderMoney`, `dt_time`,otype,storename)
    | VALUES (?,?,?,?,?,?,?,?,?,?,?)
    """.stripMargin
    val conn = DriverManager.getConnection(url, user, passwd)
    conn.setAutoCommit(false)
    try {
    val truncatesql = conn.prepareStatement("delete from lxj_order_theday_board_streaming_test")
    // val truncatesql = conn.prepareStatement("TRUNCATE lxj_order_theday_board")
    truncatesql.execute()
    val dataps = conn.prepareStatement(insertSql)
    val list = dataDF.rdd.collect().toList
    // println("list prin=" + list)
    list.foreach(unit => {
    dataps.setInt(1, unit.getAs[Int]("provinceid"))
    dataps.setString(2, unit.getAs[String]("provincename"))
    dataps.setInt(3, unit.getAs[Int]("cityid"))
    dataps.setString(4, unit.getAs[String]("cityname"))
    dataps.setString(5, unit.getAs[String]("orderDate"))
    dataps.setString(6, unit.getAs[String]("extStoreId"))
    dataps.setLong(7, unit.getAs[Long]("orderNum"))
    dataps.setDouble(8, unit.getAs[Double]("orderMoney"))
    dataps.setInt(9, unit.getAs[Long]("jobtime").toInt)
    dataps.setInt(10, unit.getAs[Int]("otype"))
    dataps.setString(11, unit.getAs[String]("storename"))
    dataps.addBatch()
    })
    dataps.executeBatch()
    conn.commit()
    } catch {
    case e: Exception => {
    e.printStackTrace()
    println(e)
    conn.rollback()
    throw new Exception("mysql error =" + e.getMessage)
    }
    } finally {
    conn.close()
    }
    }

    def updateTime(): Unit = {
    val (user, passwd, url) = common.LocalMysqlSettings("finereport.user", "finereport.passwd", "finereport.url", "finereportmysql.properties")
    val conn = DriverManager.getConnection(url, user, passwd)
    try {
    val truncatesql = conn.prepareStatement("update lxj_order_theday_board_streaming_test set db_uptime=now()")
    truncatesql.execute()
    } catch {
    case e: Exception => {
    println(e)
    throw new Exception("mysql error =" + e.getMessage)
    }
    } finally {
    conn.close()
    }
    }

    def tableExists(table: String, spark: SparkSession) =
    spark.catalog.tableExists(table)
    }

    /** Case class for converting RDD to DataFrame */
    case class Record(word: String)

    case class StreamingStoreEntity(extId: String, extOrderId: String, extStoreId: String, stype: Int, payType: Int, orderMoney: Double, orderStatus: Int, orderDate: String, createTime: Long, otype: Int, orderNo: String, orderId: String, jobTime: Long)

    /** Lazily instantiated singleton instance of SparkSession */
    object SparkSessionSingleton {
    @transient private var instance: SparkSession = _

    @transient private var grok: Grok = _
    def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
    // val conf = new SparkConf().setMaster("yarn").setAppName("LxjOrderStreaming")
    instance = SparkSession
    .builder
    .enableHiveSupport()
    .master("yarn")
    .appName("SparkSessionSingleton")
    .config(sparkConf)
    // .config("spark.files.openCostInBytes", PropertyUtil.getInstance().getProperty("spark.files.openCostInBytes"))
    // .config("hive.metastore.uris","thrift://namenode01.cd:9083")連接到hive元數據庫 --files hdfs:///user/processuser/hive-site.xml 集群上運行需要指定hive-site.xml的位置
    // .config("spark.sql.warehouse.dir","hdfs://namenode01.cd:8020/user/hive/warehouse")
    .getOrCreate()
    }
    instance
    }

    val pattern: String = "\[(?<createTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\] \[%{DATA:level}\] \[%{DATA:className}\] \[%{DATA:methodName}\] \[%{DATA:thread}\] \[%{GREEDYDATA:msg}\] \[%{NUMBER:clientType:int}\] \[%{NUMBER:step:int}\] \[%{NUMBER:flag:int}\] \[%{DATA:ip}\] \[%{DATA:clientIp}\] \[%{NUMBER:costTime:int}\] \[%{DATA:isErr}\] \[%{DATA:errName}\] \[%{DATA:logId}\] \[%{DATA:sysName}\] \[%{DATA:actionName}\] \[%{DATA:apiName}\] \[%{DATA:platformKey}\]"

    def getGrokInstance(): Grok = {
    if (grok == null) {
    grok = new Grok()
    val inputStream = this.getClass.getClassLoader.getResourceAsStream("patterns.txt")
    grok.addPatternFromReader(new InputStreamReader(inputStream))
    grok.compile(pattern)
    }
    grok
    }
    }


    package biReportJob.streaming

    import java.text.SimpleDateFormat
    import java.util.{Calendar, Locale}

    import common.getEnv
    import org.apache.spark.sql.SparkSession

    object CleamDataJob {

    def main(args: Array[String]): Unit = {

    val sparkBuilder = SparkSession.builder()
    val cal = Calendar.getInstance()
    cal.add(Calendar.DATE, -3)
    if ("local".equals(getEnv("env"))) {
    sparkBuilder.master("local[*]").config("hive.metastore.uris", "thrift://hdp02:9083")
    }
    val spark = sparkBuilder
    .appName("CleamDataJob")
    .enableHiveSupport()
    .getOrCreate()
    val dateFormate: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.US)
    val c_yymmdd = dateFormate.format(cal.getTime)
    val table_after = getEnv("env")
    println("env=" + getEnv("env") + " c_yymmdd=" + c_yymmdd)
    spark.sql(s"ALTER TABLE zt.lxj_streaming_order_data_$table_after DROP IF EXISTS PARTITION(part_job='$c_yymmdd')")


    }

    //查看原数据
    //SELECT sum(orderMoney) from(
    // select t1.* from (
    // select *, row_number() over(partition by orderId ORDER BY createTime desc) rn
    // from zt.lxj_streaming_order_data_stag where part_job='2019-08-16' and orderDate='20190816'
    // ) t1 where t1.rn=1
    //) as tt1
    //
    //SELECT sum(orderMoney) FROM zt.lxj_streaming_order_data_stag where orderdate='20190816' and part_job='2019-08-16'


    }

    ————————————————
    版权声明:本文为CSDN博主「java的爪哇」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/liangrui1988/article/details/99647065

  • 相关阅读:
    body.filters 出现未指明错误
    利用sql server创建可重复运行的存储过程
    有关如何阻止ASP.NET的按钮控件提交页面的小发现
    ASP.NET 2.0中减少ViewState大小的一点小发现
    DataSet 的 Tables 属性对表名大小写敏感性的解惑[翻译]
    利用Sql Server Management Studio 创建视图的问题
    在用数据绑定的时候我为什么不能把焦点移出(Tab out)我的控件?(译)
    什么是 Change Notification,为什么它很重要(译)
    Rms For Sharepoint
    sharepoint 2010 容量边界
  • 原文地址:https://www.cnblogs.com/javalinux/p/15066502.html
Copyright © 2011-2022 走看看