zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十之铭文升级版

    铭文一级:

    第八章:Spark Streaming进阶与案例实战

    updateStateByKey算子
    需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态)

    java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().


    需求:将统计结果写入到MySQL
    create table wordcount(
    word varchar(50) default null,
    wordcount int(10) default null
    );

    通过该sql将统计结果写入到MySQL
    insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"

    存在的问题:
    1) 对于已有的数据做更新,而是所有的数据均为insert
    改进思路:
    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中:HBase/Redis

    2) 每个rdd的partition创建connection,建议大家改成连接池


    window:定时的进行一个时间段内的数据处理

    window length : 窗口的长度
    sliding interval: 窗口的间隔

    这2个参数和我们的batch size有关系:倍数

    每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc
    ==> 每隔sliding interval统计前window length的值

    铭文二级:

    第七章:Spark Streaming核心概念与编程

    实战:Spark Streaming处理文件系统数据=>

    与处理socket数据类似

    1.建FileWordCount类

    2.建监控的路径,本次为:/Users/rocky/data/imooc/ss

    3.只需修改SocketTextStream成textFileStream

    参数设置为file:///Users/rocky/data/imooc/ss/        /*     前面的“///”、最后的“/”  */

    4.vi test.log  //里面有内容,然后cp到监控的路径

    nc监控6789端口即可

    注意事项:

    官网Basic Sources

    1、必须每次相同的文件格式

    2、必须使用移动的方式将内容move到路径

    3、一旦移动,无法再修改里面的内容

    第八章:Spark Streaming进阶与案例实战

    实战:使用UpdateStateByKey算子统计到目前为止累计出现的单词个数

    copy一个NetworkWordCount类改成StatefulWordCount

    步骤一、将reduceBykey改成UpdateStateByKey

    官网代码(两个重要参数:newValues、running):

    def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount = ...  // add the new values with the previous running count to get the new count
        Some(newCount)
    }
    

    步骤二、自定义代码:

    def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
        val current = currentValues.sum
        val pre = preValues.getOrElse(0)
        Some(current + pre)
      }
    

      

    步骤三、修改代码:

    ssc.checkpoint(".")    //一定要设置,运行后文件夹根目录会出现receivedBlockMetadata文件夹

    ps:checkpoint一般生产上设置到HDFS的某个文件夹

    val result = lines.flatMap(_.split(" ")).map((_,1))
    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()

    实战:计算到目前为止累计出现的单词个数写到mysql中:

    ps:mysql知识复习

    mysql -uroot -proot          //登录mysql

    create database imooc_spark;    //建立imooc_spark数据库

    use imooc_spark;          //使用数据库

    show tables;             //查看表

    select * from wordcount;       //查看表内容

    复制一个类文件(删掉UpdateStateByKey算子的相关内容)

    步骤一、copy一个StatefulWordCount类改成ForeachRDDApp类

    停掉之前运行的程序,删掉receivedBlockMetadata的文件内容

    步骤二、在mysq建表wordcount

    word varchar(50) default null,
    wordcount int(10) default null

    步骤三、提供的自定义代码:

    package com.imooc.spark
    import java.sql.DriverManager
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    /**
      * 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
      */
    object ForeachRDDApp {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        val lines = ssc.socketTextStream("localhost", 6789)
        val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        //result.print()  //此处仅仅是将统计结果输出到控制台
        //TODO... 将结果写入到MySQL
        //    result.foreachRDD(rdd =>{
        //      val connection = createConnection()  // executed at the driver
        //      rdd.foreach { record =>
        //        val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
        //        connection.createStatement().execute(sql)
        //      }
        //    })
        result.print()
        result.foreachRDD(rdd => {
          rdd.foreachPartition(partitionOfRecords => {
            val connection = createConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
              connection.createStatement().execute(sql)
            })
            connection.close()
          })
        })
        ssc.start()
        ssc.awaitTermination()
      }
      /**
        * 获取MySQL的连接
        */
      def createConnection() = {
        Class.forName("com.mysql.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark", "root", "root")
      }
    }
    

      报错分析:

    1、connection.createStatement().execute(sql)//没有驱动包,自己引入

    2、第一种官网连接会报序列化错误,自己改成partition式连接,如上面代码

    3、重复执行,mysql数据库的列名会重复出现,自行使用Hbase或redis等数据库

    4、改成连接池的方式

    官网代码参考:

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    

    实战:窗口函数的使用(摘自官网)

    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
    

     

  • 相关阅读:
    pgspider sqlite mysql docker 镜像
    pgspider docker 镜像
    pgspider基于pg 的高性能数据可视化sql 集群引擎
    diesel rust orm 框架试用
    golang 条件编译
    Performance Profiling Zeebe
    bazel 学习一 简单java 项目运行
    一个好用node http keeplive agnet
    gox 简单灵活的golang 跨平台编译工具
    mailhog 作为smtp server mock工具
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8378789.html
Copyright © 2011-2022 走看看