zoukankan      html  css  js  c++  java
  • 流式分析系统实现 之二

    Spark Streaming Mysql Window

      继“流式分析系统实现之一”后采用Window函数对1分钟内的数据进行统计,虽然在一中也已说明但是并没有实践,所以在此篇文章中对Window进行介绍及操作同时把数据存储到Mysql数据库中,这样就可以查看每分钟的数据,以下代码只是模拟和展示,没有具体存一些其它数据如时间戳,批次等。

    一、Spark window 函数
         在《流式分析系统实现》中我们实现了每10秒统计PV、IP的PV、关键词的PV,但是我们如何能同时统计一分钟的PV的数据呢,答案就是使用Window函数。
    Window(窗口函数)

     

    具体以上函数的例子及演示请参考https://www.cnblogs.com/duanxz/p/4408789.html
    本笔记主要是对《流式分析系统实现》的代码进行修改支持窗口函数和数据结果存入到Mysql数据库,所有代码都是在spark-shell中执行。
         
         注意:因为代码是在spark-shell中执行并且其中还使用到mysql数据库,所以在启动spark-shell时一定要加载mysql-connect-java.jar包,具体如下:
         spark-shell --master local[2] --jars /usr/share/java/mysql-connector-java.jar --driver-class-path /usr/share/java/mysql-connector-java.jar
         在spark streaming window 时一定要设计检查点(checkpoint)不然会报错,代码如下:
    //导入类
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds,StreamingContext}
    import java.sql.{PreparedStatement,Connection,DriverManager}
    import java.util.Date
    
    //得到时间戳
    val now = new Date()
    def getCurrent_time(): String = {
        val a = now.getTime
        var str = a+""
        str.substring(0,10)
    }
    //设计计算的周期,单位秒
    val batch = 10
    
    /*
     * 这是bin/spark-shell交互式模式下创建StreamingContext的方法
     * 非交互式请使用下面的方法来创建
     */
    val ssc = new StreamingContext(sc,Seconds(batch))
    
    /*
    // 非交互式下创建StreamingContext的方法
    val conf = new SparkConf().setAppName("NginxAnay")
    val ssc = new StreamingContext(conf, Seconds(batch))
    */
    
    
    /*
     * 创建输入DStream,是文本文件目录类型
     * 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming
     */
    val lines = ssc.textFileStream("hdfs:///spark/streaming")
    
    /*
     * 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果
     */
    
    //窗口方法必须配置checkpoint,可以这样配置:
    ssc.checkpoint("hdfs:///spark/checkpoint")
    
    //1.总pv(这是常规每10秒一个周期的PV统计)
    lines.count().print()
    
    //1.这是每分钟(连续多个周期)一次的PV统计
    lines.countByWindow(Seconds(batch*6),Seconds(batch*6)).print()
    
    //2. 各IP的PV,按PV倒序
    // 空格分隔的第一个字段就是IP
    lines.map(line => {(line.split(" ")(0),1)}).reduceByKey(_ + _).transform(rdd => {
        rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).
        sortByKey(false).
        map(ip_pv => (ip_pv._2,ip_pv._1))
    }).print()
    //2.这是每分钟(连续多个周期)一次的各IP的PV,按pv倒序 ,采用窗口函数统计一分钟的数据
    val IpPairDStream = lines.map(line => {(line.split(" ")(0),1)})
    val IpCountsDStream = IpPairDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,Seconds(batch * 6),Seconds(batch*6))
    val finalDStream = IpCountsDStream.transform(rdd => {
        rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).sortByKey(false).map(ip_pv => (ip_pv._2,ip_pv._1))})
    finalDStream.print()
    
    //3.搜索引擎PV
    val refer = lines.map(_.split(""")(3))
    
    //先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
    //输出(host,query_keys)
    val searchEnginInfo = refer.map(r => {
        val f = r.split('/')
        val searchEngines = Map(
            "www.google.cn" -> "q",
            "www.yahoo.com" -> "p",
            "cn.bing.com" -> "q",
            "www.baidu.com" -> "wd",
            "www.sogou.com" -> "query"
        )
    
        if (f.length > 2) {
            val host = f(2)
    
            if(searchEngines.contains(host)) {
                val query = r.split('?')(1)
                if(query.length > 0) {
                    val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                    if(arr_search_q.length > 0)
                        (host,arr_search_q(0).split('=')(1))
                    else
                        (host,"")
                } else {
                    (host,"")
                }
            } else
                ("","")
        } else
            ("","")
    })
    
    //输出搜索引擎PV
    searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1,1)}).reduceByKey(_ + _).print()
    
    //4.关键词PV
    searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2,1)}).reduceByKey(_ + _).print()
    
    //5.终端类型PV
    lines.map(_.split(""")(5)).map(agent => {
        val types = Seq("iPhone","Android")
        var r = "Default"
        for (t <- types) {
            if(agent.indexOf(t) != -1)
                r = t
        }
        (r,1)
    }).reduceByKey(_ + _).print()
    
    //6.各页面PV
    val pagePv = lines.map(line => {(line.split(""")(1).split(" ")(1),1)}).reduceByKey(_ + _)
    pagePv.print()
    //6.RDD数据插入mysql数据库
    pagePv.foreachRDD( rdd => {
        rdd.foreachPartition(eachPartition => {
            var conn: Connection = null;
            var stmt: PreparedStatement = null;
            try {
                val url = "jdbc:mysql://192.168.0.58:3306/streaming?characterEncoding=utf8&useSSL=true";
                val user = "streaming";
                val password = "streaming";
                conn = DriverManager.getConnection(url,user,password)
                eachPartition.foreach(record => {
                    val sql = "insert into page_pv(pageName,pageCount) values(?,?)";
                    stmt = conn.prepareStatement(sql);
                    stmt.setString(1,record._1);
                    stmt.setInt(2,record._2);
                    stmt.executeUpdate();
                })
                } catch {
                    case e: Exception => e.printStackTrace()
                } finally {
                    if (stmt != null) {
                        stmt.close()
                    }
                    if (conn != null) {
                        conn.close()
                    }
                }
        })
    })
    
    //6.这是每分钟(连续多个周期)一次的各页面的PV(调用窗口函数)
    val pageDStream = lines.map(line => {(line.split(""")(1).split(" ")(1),1)})
    val pagePairsDStream = pageDStream.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+ v2,Seconds(batch*6),Seconds(batch*6))
    pagePairsDStream.print()
    //6.RDD数据插入mysql数据库
    
    
    //启动计算,等待执行结束(出错或Ctrl+C退出)
    ssc.start()
    ssc.awaitTermination()
    View Code
  • 相关阅读:
    Docker——搭建SFTP
    PicGo——利用PicGo和GitHub搭建免费图床提供给Typecho使用
    GitHub——如何生成Personal access tokens
    leetcode——两数相加【二】
    每天一道面试题——请实现add(1,2)(3)【二】
    leetcode——两数之和【一】
    每天一道面试题——JavaScript的this指向【一】
    PHP——安装ThinkPHP框架报错
    项目代码 if/else 过多,引起程序猿口吐莲花
    JDK9-JDK14 相关新特性说明及使用
  • 原文地址:https://www.cnblogs.com/xiqing/p/9662607.html
Copyright © 2011-2022 走看看