zoukankan      html  css  js  c++  java
  • 流式分析系统实现 之二

    Spark Streaming Mysql Window

      继“流式分析系统实现之一”后采用Window函数对1分钟内的数据进行统计,虽然在一中也已说明但是并没有实践,所以在此篇文章中对Window进行介绍及操作同时把数据存储到Mysql数据库中,这样就可以查看每分钟的数据,以下代码只是模拟和展示,没有具体存一些其它数据如时间戳,批次等。

    一、Spark window 函数
         在《流式分析系统实现》中我们实现了每10秒统计PV、IP的PV、关键词的PV,但是我们如何能同时统计一分钟的PV的数据呢,答案就是使用Window函数。
    Window(窗口函数)

     

    具体以上函数的例子及演示请参考https://www.cnblogs.com/duanxz/p/4408789.html
    本笔记主要是对《流式分析系统实现》的代码进行修改支持窗口函数和数据结果存入到Mysql数据库,所有代码都是在spark-shell中执行。
         
         注意:因为代码是在spark-shell中执行并且其中还使用到mysql数据库,所以在启动spark-shell时一定要加载mysql-connect-java.jar包,具体如下:
         spark-shell --master local[2] --jars /usr/share/java/mysql-connector-java.jar --driver-class-path /usr/share/java/mysql-connector-java.jar
         在spark streaming window 时一定要设计检查点(checkpoint)不然会报错,代码如下:
    //导入类
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds,StreamingContext}
    import java.sql.{PreparedStatement,Connection,DriverManager}
    import java.util.Date
    
    //得到时间戳
    val now = new Date()
    def getCurrent_time(): String = {
        val a = now.getTime
        var str = a+""
        str.substring(0,10)
    }
    //设计计算的周期,单位秒
    val batch = 10
    
    /*
     * 这是bin/spark-shell交互式模式下创建StreamingContext的方法
     * 非交互式请使用下面的方法来创建
     */
    val ssc = new StreamingContext(sc,Seconds(batch))
    
    /*
    // 非交互式下创建StreamingContext的方法
    val conf = new SparkConf().setAppName("NginxAnay")
    val ssc = new StreamingContext(conf, Seconds(batch))
    */
    
    
    /*
     * 创建输入DStream,是文本文件目录类型
     * 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming
     */
    val lines = ssc.textFileStream("hdfs:///spark/streaming")
    
    /*
     * 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果
     */
    
    //窗口方法必须配置checkpoint,可以这样配置:
    ssc.checkpoint("hdfs:///spark/checkpoint")
    
    //1.总pv(这是常规每10秒一个周期的PV统计)
    lines.count().print()
    
    //1.这是每分钟(连续多个周期)一次的PV统计
    lines.countByWindow(Seconds(batch*6),Seconds(batch*6)).print()
    
    //2. 各IP的PV,按PV倒序
    // 空格分隔的第一个字段就是IP
    lines.map(line => {(line.split(" ")(0),1)}).reduceByKey(_ + _).transform(rdd => {
        rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).
        sortByKey(false).
        map(ip_pv => (ip_pv._2,ip_pv._1))
    }).print()
    //2.这是每分钟(连续多个周期)一次的各IP的PV,按pv倒序 ,采用窗口函数统计一分钟的数据
    val IpPairDStream = lines.map(line => {(line.split(" ")(0),1)})
    val IpCountsDStream = IpPairDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,Seconds(batch * 6),Seconds(batch*6))
    val finalDStream = IpCountsDStream.transform(rdd => {
        rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).sortByKey(false).map(ip_pv => (ip_pv._2,ip_pv._1))})
    finalDStream.print()
    
    //3.搜索引擎PV
    val refer = lines.map(_.split(""")(3))
    
    //先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
    //输出(host,query_keys)
    val searchEnginInfo = refer.map(r => {
        val f = r.split('/')
        val searchEngines = Map(
            "www.google.cn" -> "q",
            "www.yahoo.com" -> "p",
            "cn.bing.com" -> "q",
            "www.baidu.com" -> "wd",
            "www.sogou.com" -> "query"
        )
    
        if (f.length > 2) {
            val host = f(2)
    
            if(searchEngines.contains(host)) {
                val query = r.split('?')(1)
                if(query.length > 0) {
                    val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                    if(arr_search_q.length > 0)
                        (host,arr_search_q(0).split('=')(1))
                    else
                        (host,"")
                } else {
                    (host,"")
                }
            } else
                ("","")
        } else
            ("","")
    })
    
    //输出搜索引擎PV
    searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1,1)}).reduceByKey(_ + _).print()
    
    //4.关键词PV
    searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2,1)}).reduceByKey(_ + _).print()
    
    //5.终端类型PV
    lines.map(_.split(""")(5)).map(agent => {
        val types = Seq("iPhone","Android")
        var r = "Default"
        for (t <- types) {
            if(agent.indexOf(t) != -1)
                r = t
        }
        (r,1)
    }).reduceByKey(_ + _).print()
    
    //6.各页面PV
    val pagePv = lines.map(line => {(line.split(""")(1).split(" ")(1),1)}).reduceByKey(_ + _)
    pagePv.print()
    //6.RDD数据插入mysql数据库
    pagePv.foreachRDD( rdd => {
        rdd.foreachPartition(eachPartition => {
            var conn: Connection = null;
            var stmt: PreparedStatement = null;
            try {
                val url = "jdbc:mysql://192.168.0.58:3306/streaming?characterEncoding=utf8&useSSL=true";
                val user = "streaming";
                val password = "streaming";
                conn = DriverManager.getConnection(url,user,password)
                eachPartition.foreach(record => {
                    val sql = "insert into page_pv(pageName,pageCount) values(?,?)";
                    stmt = conn.prepareStatement(sql);
                    stmt.setString(1,record._1);
                    stmt.setInt(2,record._2);
                    stmt.executeUpdate();
                })
                } catch {
                    case e: Exception => e.printStackTrace()
                } finally {
                    if (stmt != null) {
                        stmt.close()
                    }
                    if (conn != null) {
                        conn.close()
                    }
                }
        })
    })
    
    //6.这是每分钟(连续多个周期)一次的各页面的PV(调用窗口函数)
    val pageDStream = lines.map(line => {(line.split(""")(1).split(" ")(1),1)})
    val pagePairsDStream = pageDStream.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+ v2,Seconds(batch*6),Seconds(batch*6))
    pagePairsDStream.print()
    //6.RDD数据插入mysql数据库
    
    
    //启动计算,等待执行结束(出错或Ctrl+C退出)
    ssc.start()
    ssc.awaitTermination()
    View Code
  • 相关阅读:
    【Anagrams】 cpp
    【Count and Say】cpp
    【Roman To Integer】cpp
    【Integer To Roman】cpp
    【Valid Number】cpp
    重构之 实体与引用 逻辑实体 逻辑存在的形式 可引用逻辑实体 不可引用逻辑实体 散弹式修改
    Maven项目聚合 jar包锁定 依赖传递 私服
    Oracle学习2 视图 索引 sql编程 游标 存储过程 存储函数 触发器
    mysql案例~tcpdump的使用
    tidb架构~本地化安装
  • 原文地址:https://www.cnblogs.com/xiqing/p/9662607.html
Copyright © 2011-2022 走看看