zoukankan      html  css  js  c++  java
  • spark与kafka集成进行实时 nginx代理 这种sdk埋点 原生日志实时解析 处理

    日志格式
    202.108.16.254^A1546795482.600^A/cntv.gif?appId=3&areaId=8213&srcContId=2535575&areaType=1&srcContName=%E5%88%87%E7%89%B9%E9%87%8C%E6%A2%85%E5%BC%80%E4%BA%8C%E5%BA%A6+%E5%8D%B0%E5%BA%A64-1%E5%A4%A7%E8%83%9C%E6%B3%B0%E5%9B%BD%E5%96%9C%E8%BF%8E%E5%BC%80%E9%97%A8%E7%BA%A2&clientChannel=vivo&clientVersion=2.7.2&contId=2535584&serverIp=172.16.42.154&menuId=8212&visitTime=20190107012442630&url=http%3A%2F%2Fm.cctv4g.com%2Fcntv%2Fresource%2Fcltv2%2FdramaDetailPage.jsp%3FcontId%3D2535575%26dataType%3D3%26stats_menuId%3D8212%26stats_areaId%3D8213%26stats_areaType%3D1%26stats_contId%3D2535584%26stats_srcContType%3D3%26stats_srcContId%3D2535575%26wdChannelName%3Dvivo%26wdVersionName%3D2.7.2%26wdClientType%3D1%26wdAppId%3D3%26wdNetType%3D4G%26uuid%3De8fb9e0c-5b59-36f6-80d7-88df323fa750&srcContType=3&appName=CCTV%E6%89%8B%E6%9C%BA%E7%94%B5%E8%A7%86++%EF%BC%88V2%EF%BC%89&netType=4G&areaName=%E6%B5%B7%E6%8A%A5&contName=%E5%88%87%E7%89%B9%E9%87%8C%E6%A2%85%E5%BC%80%E4%BA%8C%E5%BA%A6+%E5%8D%B0%E5%BA%A64-1%E5%A4%A7%E8%83%9C%E6%B3%B0%E5%9B%BD%E5%96%9C%E8%BF%8E%E5%BC%80%E9%97%A8%E7%BA%A2&sessionId=59787199A5F8278836AD26F672743C29&ua=yichengtianxia&en=e_pv&uuid=e8fb9e0c-5b59-36f6-80d7-88df323fa750&clientIp=223.104.105.169&menuName=2019%E5%B9%B4%E9%98%BF%E8%81%94%E9%85%8B%E4%BA%9A%E6%B4%B2%E6%9D%AF&clientType=1
    数据视频审核记录与用户访问记录 进行了实时解析 (demo程序)
    改进:硬编码改为软编码 ,解析构建成解析类,代码优化 与逻辑判断加强(多次测试还未出错)
    1.离线数据后续可将转为dataframe存入hive进行仓库存储进行离线分析(spark core,sql都可以)=》存入mysql进行datav ,或者后端报表
    2.实时存入mysql或者hbase进行实时展示 (前面几篇已经记载了)



    import java.net.URLDecoder
    import java.sql.{Connection, DriverManager}
    
    import com.spark.common.{EventLogConstants, LoggerUtil, Test, TimeUtil}
    import kafka.serializer.StringDecoder
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.log4j.Logger
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.immutable.HashMap
    
    object SxRlStatDemo extends Serializable {
      val logger = Logger.getLogger(classOf[LoggerUtil])
      private val serialVersionUID = -4892194648703458595L
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local[2]").setAppName("sxdemo")
          .set("spark.streaming.kafka.maxRatePerPartition", "100")
          .set("spark.streaming.backpressure.enabled", "true")
        //开启被压
        val sc = SparkContext.getOrCreate(conf)
        val ssc = new StreamingContext(sc, Seconds(1))
    
        // 二、DStream的构建
        // kafka的Simple consumer API的连接参数, 只有两个
        // metadata.broker.list: 给定Kafka的服务器路径信息
        // auto.offset.reset:给定consumer的偏移量的值,largest表示设置为最大值,smallest表示设置为最小值(最大值&最小值指的是对应的分区中的日志数据的偏移量的值) ==> 每次启动都生效
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> "hadoop04:9092,hadoop05:9092,hadoop06:9092",
          "auto.offset.reset" -> "largest",
          "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
          "value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer")
        //      "spark.serializer"->"org.apache.spark.serializer.KryoSerializer")
        // 给定一个由topic名称组成的set集合
        val topics = Set("topic_bc")
        val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
          //      .mapog => {
          //
    
          //      })
          .transform(rdd => {
          rdd.map(log => {
            var map: Map[String, String] = new HashMap[String, String]
            val splits = log.split("\^A")
            if (splits.length==3){
            val ip = splits(0).trim
            val nginxTime = TimeUtil.parseNginxServerTime2Long(splits(1).trim).toString;
            if (nginxTime != "-1") {
              nginxTime.toString
            }
            val requestStr = splits(2)
            val index = requestStr.indexOf("?")
            if (index > -1) { // 有请求参数的情况下,获取?后面的参数
              val requestBody: String = requestStr.substring(index + 1)
              var areaInfo = if (ip.nonEmpty) Test.getInfo(ip) else Array("un", "un", "un")
              val requestParames = requestBody.split("&")
              for (e <- requestParames) {
                val index = e.indexOf("=")
                if (index < 1) {
                  logger.debug("次日志无法解析")
                }
                var key = ""; var value = "";
                key = e.substring(0, index)
                value = URLDecoder.decode(e.substring(index + 1), EventLogConstants.LOG_PARAM_CHARSET)
                map.+=(key -> value)
              }
              map.+=("ip" -> ip, "s_time" -> nginxTime, "country" -> areaInfo(0), "provence" -> areaInfo(1), "city" -> areaInfo(2))
            }else{ logger.debug("次日志无法解析")}
            }
            map
          })
    
        })
        stream.cache()
        ssc.checkpoint("checkpoint")
        val bc_personAmt = stream.filter(log => log.contains("en") && log("en") == "e_sx")
          // combine_map.get("test_101").getOrElse("不存在") //根据key取value值,如果不存在返回后面的值
          //  scala> a.get(1)
          // res0: Option[Int] = Some(2) get返回的是Option[Int]类型 不可能等于" " ==Some("e_la")
          .map(log => (log("bc_person"), 1))
          .updateStateByKey[Long]((seq: Seq[Int], state: Option[Long]) => {
          //seq:Seq[Long] 当前批次中每个相同key的value组成的Seq
          val currentValue = seq.sum
          //state:Option[Long] 代表当前批次之前的所有批次的累计的结果,val对于wordcount而言就是先前所有批次中相同单词出现的总次数
          val preValue = state.getOrElse(0L)
          Some(currentValue + preValue)
        })
  • 相关阅读:
    react 和 vue 的优缺点总结
    解决js小数求和出现多位小数问题
    同步循环发请求用promise
    hook中ref使用
    只能输入数字和保留三位的小树
    react添加右键点击事件
    redux
    深拷贝和浅拷贝区别及概念
    pointer-events的css属性。使用该属性可以决定是否能穿透绝对定位元素去触发下面元素的某些行为
    React 使用link在url添加参数(url中不可见)
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10342753.html
Copyright © 2011-2022 走看看