zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十之铭文升级版

    铭文一级:

    第八章:Spark Streaming进阶与案例实战

    updateStateByKey算子
    需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态)

    java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().


    需求:将统计结果写入到MySQL
    create table wordcount(
    word varchar(50) default null,
    wordcount int(10) default null
    );

    通过该sql将统计结果写入到MySQL
    insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"

    存在的问题:
    1) 对于已有的数据做更新,而是所有的数据均为insert
    改进思路:
    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中:HBase/Redis

    2) 每个rdd的partition创建connection,建议大家改成连接池


    window:定时的进行一个时间段内的数据处理

    window length : 窗口的长度
    sliding interval: 窗口的间隔

    这2个参数和我们的batch size有关系:倍数

    每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc
    ==> 每隔sliding interval统计前window length的值

    铭文二级:

    第七章:Spark Streaming核心概念与编程

    实战:Spark Streaming处理文件系统数据=>

    与处理socket数据类似

    1.建FileWordCount类

    2.建监控的路径,本次为:/Users/rocky/data/imooc/ss

    3.只需修改SocketTextStream成textFileStream

    参数设置为file:///Users/rocky/data/imooc/ss/        /*     前面的“///”、最后的“/”  */

    4.vi test.log  //里面有内容,然后cp到监控的路径

    nc监控6789端口即可

    注意事项:

    官网Basic Sources

    1、必须每次相同的文件格式

    2、必须使用移动的方式将内容move到路径

    3、一旦移动,无法再修改里面的内容

    第八章:Spark Streaming进阶与案例实战

    实战:使用UpdateStateByKey算子统计到目前为止累计出现的单词个数

    copy一个NetworkWordCount类改成StatefulWordCount

    步骤一、将reduceBykey改成UpdateStateByKey

    官网代码(两个重要参数:newValues、running):

    def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount = ...  // add the new values with the previous running count to get the new count
        Some(newCount)
    }
    

    步骤二、自定义代码:

    def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
        val current = currentValues.sum
        val pre = preValues.getOrElse(0)
        Some(current + pre)
      }
    

      

    步骤三、修改代码:

    ssc.checkpoint(".")    //一定要设置,运行后文件夹根目录会出现receivedBlockMetadata文件夹

    ps:checkpoint一般生产上设置到HDFS的某个文件夹

    val result = lines.flatMap(_.split(" ")).map((_,1))
    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()

    实战:计算到目前为止累计出现的单词个数写到mysql中:

    ps:mysql知识复习

    mysql -uroot -proot          //登录mysql

    create database imooc_spark;    //建立imooc_spark数据库

    use imooc_spark;          //使用数据库

    show tables;             //查看表

    select * from wordcount;       //查看表内容

    复制一个类文件(删掉UpdateStateByKey算子的相关内容)

    步骤一、copy一个StatefulWordCount类改成ForeachRDDApp类

    停掉之前运行的程序,删掉receivedBlockMetadata的文件内容

    步骤二、在mysq建表wordcount

    word varchar(50) default null,
    wordcount int(10) default null

    步骤三、提供的自定义代码:

    package com.imooc.spark
    import java.sql.DriverManager
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    /**
      * 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
      */
    object ForeachRDDApp {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        val lines = ssc.socketTextStream("localhost", 6789)
        val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        //result.print()  //此处仅仅是将统计结果输出到控制台
        //TODO... 将结果写入到MySQL
        //    result.foreachRDD(rdd =>{
        //      val connection = createConnection()  // executed at the driver
        //      rdd.foreach { record =>
        //        val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
        //        connection.createStatement().execute(sql)
        //      }
        //    })
        result.print()
        result.foreachRDD(rdd => {
          rdd.foreachPartition(partitionOfRecords => {
            val connection = createConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
              connection.createStatement().execute(sql)
            })
            connection.close()
          })
        })
        ssc.start()
        ssc.awaitTermination()
      }
      /**
        * 获取MySQL的连接
        */
      def createConnection() = {
        Class.forName("com.mysql.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark", "root", "root")
      }
    }
    

      报错分析:

    1、connection.createStatement().execute(sql)//没有驱动包,自己引入

    2、第一种官网连接会报序列化错误,自己改成partition式连接,如上面代码

    3、重复执行,mysql数据库的列名会重复出现,自行使用Hbase或redis等数据库

    4、改成连接池的方式

    官网代码参考:

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    

    实战:窗口函数的使用(摘自官网)

    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
    

     

  • 相关阅读:
    详解consul的安装和配置
    vs2017
    Android Bitmap 和 ByteArray的互相转换
    android中的byte数组转换(转)
    TensorFlow编译androiddemo
    Linux下命令行安装配置android sdk
    Warning: cast to/from pointer from/to integer of different size
    scribe 搭建遇到的问题
    参数依赖查找(ADL,Argument-dependent lookup)
    模板类继承后找不到父类函数的问题
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8378789.html
Copyright © 2011-2022 走看看