zoukankan      html  css  js  c++  java
  • 以spark sql 维护spark streaming offset

    以spark sql 维护spark streaming offset

    三年前的东西了 项目地址 https://github.com/cclient/spark-streaming-kafka-offset-mysql

    spark streming消费kafka的模式

    • Receiver

    • Direct

    Direct 需自定义offset维护的实现,offset通常是写入第三方存储服务

    选型 分布式/高可用 易用性 事务性 易维护性 历史offset方案
    redis 良好 一般 良好 单 key存储,难实现
    mysql 良好 良好 良好 优 表存储,易实现
    zookeeper 一般 一般 一般

    以上几个评价指标只是根据个人需要的评估,并不是绝对的技术概念。redis可以组分布式,mysql可以替换为其他Mpp数据库例如Greenplum,tidb

    方案各有优缺点,个人的项目需求,侧重易用性,和历史offset方案实现,整体上sql类最为合适,具体需求和方案设计,方案优缺点在最后

    github上当时找到了不少的基于mysql维护offset的实现,浏览了下代码结构,发现都是通过引用jdbc来实现,代码量较大(后来又发现了ScalikeJDBC 这是后话了)

    个人早期从hadoop mr迁移到spark,做过hive sql和spark sql的开发工作,当时对spark sql比较熟悉,本能的想用spark sql做,还奇怪为什么没有基于spark sql的实现offset维护的简易方案,而是选用jdbc

    业余实间动手试了试,验证可行,并做了简单封装,相比jdbc的方案 spark sql实现offset的维护一共也没多少行代码,替换掉了原本jdbc的代码

    现在方案是因为针对特定需求的设计

    功能的需求比较类似爬虫系统/数据录入系统,有以下几个特点

    • 1 数据格式稳定,但内容不稳定,对爬虫系统而言,可能因为官方web和api改版,导致结构变更,解析不出完整的结构化信息,需要根据新结构重新消费,数据录入系统的数据也有部分来自于爬虫,也受改版的影响

    • 2 并不严格要求once,html解析操作幂等,后续的入库通常为大数据的upsert请求,同样幂等

    • 3 实时性和稳定性要求不高,通常是1-3小时内,监测出系统异常点,回滚处理即可

    针对功能需求的特点

    • 1 因为要回溯到'consumer处理出现问题的时间点',所以需要维护历史的offset,重新消费问题时间点对应的offset之后的所有数据

    • 2 offset的存储,并不是只对每group保存最新最近的一份(和kafka partition 1:1匹配) ,保存历史的话,这种数据结构对redis和zookeeper不是很友好,即使通过redis和zookeeper实现了读写,redis多key:value, zookeeper 目录数结构的key:value都不直观,通常也需要另外实现外部存储和redis和zookeeper的数据做关系,例如sql

    • 3 会有较为频繁的手动更新offset维护的需求,对redis和zookeeper的内offset的大批量数据做管理(curd),需要额外的开发工作,易用性也比不上mysql

    因此最终选型mysql来做为offset的维护方案,单表保存offset,step累加,新增消费则insert一组offset数据,每次重启服务拉取最新的(step最大)一组offset信息继续消费。额外保存count可以查询某时间段内的实际的消费量,做为消费服务的一个监控补充

    mysql> select * from kfk_offset where datetime>'2019-01-09' and topic='task-response' and `group`='extract';
    +--------+------------+-------+------+-----------+-----------+-----------+-------+---------------------+
    | id     | topic      | group | step | partition | from      | until     | count | datetime            |
    +--------+------------+-------+------+-----------+-----------+-----------+-------+---------------------+
    | 1 | task-response | extract   |  1 |         0 | 1959008 | 1995008 | 36000 | 2019-01-09 00:01:19 |
    | 2 | task-response | extract   |  1 |         1 | 1897546 | 1933546 | 36000 | 2019-01-09 00:01:19 |
    | 0 | task-response | extract   |  1 |         2 | 1876072 | 1912072 | 36000 | 2019-01-09 00:01:19 |
    | 5 | task-response | extract   |  2 |         0 | 1995008 | 2031008 | 36000 | 2019-01-09 00:05:05 |
    | 7 | task-response | extract   |  2 |         1 | 1933546 | 1969546 | 36000 | 2019-01-09 00:05:05 |
    | 6 | task-response | extract   |  2 |         2 | 1912072 | 1948072 | 36000 | 2019-01-09 00:05:05 |
    

    offset数据为增量写入

          storeOffset.toDF("topic", "group", "step", "partition", "from", "until", "count", "datetime")
            .write
            .format("jdbc")
            .mode("append")
            .options(offsetMysqlConf)
            .option("dbtable", offsetTable)
            .save()
    

    其实这时就明白了为什么网上开源的sql类offset的维护,没有基于spark sql的实现,因为spark sql的写操作太过粗糙,对于只存最近一份offset而言spark sql并不友好,只是需求和表结构恰好能良好使用append,另一些可能的原因是,spark sql的支持较晚,初期streaming无法使用spark sql而是应用jdbc,导致延用惯性。初期spark streaming的管道类服务,并不需要sql类功能。还有一个原因是mysql维护offset比较小众,不上不下的感觉,简单的用redis,复杂的用zookeeper

    Scala/Java Python Meaning
    SaveMode.ErrorIfExists(default) "error"(default) When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
    SaveMode.Append "append" When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
    SaveMode.Overwrite "overwrite" Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
    SaveMode.Ignore "ignore" Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

    与增量写入offset相对应的,获取最近offset,这里用mysql的sql直接查失败,本来想写个窗口函数,但时间有限,就分成了两步查询,第一步先查出最近的step,第二步查出该step对应的一组offset,也可以换成jdbc/ScalikeJDBC,这是个优化点

      def getLastestStepAndPartitions(sparkSession: SparkSession, mysqlConf: Map[String, String],offsetTable:String, topic: String, group: String) = {
        sparkSession.sqlContext.read.format("jdbc").options(mysqlConf).option("dbtable", offsetTable).load()
          .createOrReplaceTempView(offsetTable)
        //    todo 失败,暂用两次查询实现
        //    spark.sql("select * from kfk_offset  where step = (select max(step) from kfk_offset)")
        val lastStep = sparkSession
          .sql(s"select max(step) as step from `$offsetTable` where topic='$topic' and `group`='$group'")
          .collect().head.getAs[Int](0)
        val offset = sparkSession
          .sql(s"select * from `$offsetTable`  where topic='$topic' and `group`='$group' ")
          .filter("step=" + lastStep).collect()
          .map(row => {
            (row.getAs[Int]("partition"), row.getAs[Long]("until"))
          })
          .foldLeft[Map[TopicPartition, Long]](Map[TopicPartition, Long]())((map, row) => {
          map.updated(new TopicPartition(topic, row._1), row._2)
        })
        (lastStep, offset)
      }
    

    回溯处理数据,只需删除问题时间点后的对应offset信息即可

    spark sql对mysql的事务性支持差点意思,该方案全是insert,并没有update操作,select和insert也没有冲突,没有影响

    实际使用了有近3年,表现良好

    最主要的一个bug和需求点是事务相关

    例如一个topic有3个partition,正常情况下每step会有3条记录,但是因为mysql和spark服务的稳定性问题会导致最近step的partition信息不完备

    spark streaming 拉到最新批次的3个partition数据,最新step为100 正在往mysql写offset,spark streaming服务手动/异常重启,导致最新的step只写入了2条offset.服务再次启动时通过获取最近的partition的offset时,只获取到了2条,缺少一项partition信息导致服务启动异常。

    这个问题有两种解决办法

    • 1 mysql操作保证事务性,保证step的全批次写入。

    • 2 读取当前次最新的offset,重启时step 100只获取到两条,那对缺失的partition 获取step (100-1)=99 的partition信息

    两个方法除了实现不同外,主要是方案2会少处理一些已处理过的数据,问题只是offset更新不完整,但数据实际已经处理完了,方案1会把3个partion的误差step,重新处理(cpu和Io开销),方案2只会处理step下3个partition内出问题的项,但整体影响也不大

    该方案是特定需求的设计,对不同的spark streaming 流程,可能不是最优的方案

    增量存储会对mysql table 相关表有大量的io,

    重点是因为需求导致的offset增量存储设置思路,初期是想用spark sql试试,实际现在回头看看是否用spark sql倒是次要的

  • 相关阅读:
    Ubuntu 16.04安装Guake Terminal终端(使用一键唤醒功能)
    MySQL查询count(*)、count(1)、count(field)的区别收集
    MySQL查询在一个表而不在另一个表中的数据
    Spring MVC中的拦截器/过滤器HandlerInterceptorAdapter的使用
    Spring mvc解析
    RestTemplate的一个请求过程,mark一下
    福袋开发迭代总结
    Rest分享
    写Markdown费事?Typora让你像写word一样行云流水,所见即所得。
    送你几个用起来很爽的Studio插件
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14476970.html
Copyright © 2011-2022 走看看