zoukankan      html  css  js  c++  java
  • mysql变更数据的捕获和入库

    问题:涉及状态的信息,mysql中是update的,缺少中间状态的记录。数据分析中需要这部分数据。

    思路:后端服务通过监控某张表的某个字段,根据mysql的binlog文件,还原数据,发送到kafka。我们消费kafka中的数据,最终在hive的ods层形成表更数据表。

    方案设计

    1. java多线程消费kafka数据直接写入hdfs
      问题:
               (1)会在hdfs形成大量小文件
                 (2)   要监控java程序,维护kafka偏移量等等
    2. spark streaming程序消费kafka实时写入hdfs
      问题:
               (1)想要支持断点续传,要自己维护kafka的偏移量
               (2)线上环境spark环境资源比较吃紧,spark streaming会持续占用较多资源
    3. 每天定时跑批量spark任务,读取kafka,写入hbase,hive上建立hbase映射表
      原因:
               (1)目前配置的hbase重复写入同一条数据会覆盖前一条,如果flume挂掉,任务可以重跑
               (2)可以挑选晚上资源比较充足的时候跑
      问题:
               (1)kafka中数据是持续写入的,所以spark程序不会自动停止,要手动停掉任务
                 (2)  数据写在hbase中,hive的映射表实际读取的还是hbase的数据。所以使用时,最好将数据抽到hive中
    4. 通过flume消费kafka数据,写入hdfs
      最终选择这个方案,因为flume占用资源较少,实时处理的方式也能减少对机器的压力
      并且flume支持check point kafka的偏移量被记录在制定的文件中

    方案4中需要注意的问题:

    1. 线上新的cdh中,缺少flume组件,需要刘亚萌配合
    2. 状态变更最多的sale表和balance表每天会有上千万表更记录,需要对集群的压力测试
    3. 由于需要用到flume的check point来支持断点续传,此种模式要将kafka的channel选为文件模式,将缓存数据存到磁盘。
      缓存文件有两部分,一部分是记录的偏移量,一部分是缓存到磁盘的实际数据,当batch.size达到我们设定的条数时,
      sink将开始数据写到hdfs中。
      (1)要考虑缓存占用的磁盘大小
      (2)要考虑hdfs小文件问题。
      因此要选择合适的batchd.size(数据条数)。尽量使缓存数据大于100m小于128m,让写到hdfs的每个文件和block大小相近。
    4. 数据写入hdfs后,需要通过脚本增加hive表的分区

    待做:
              (1)调研flume使用snappy压缩格式传输
              (2)用双11的数据测试,选择agent个数
                     (如所有topic使用一个agent,每个库使用一个agent,每张表使用一个agent)
              (3)设定合理的batch size(如50000...)

    当然最好可以让上游给我们提供文件,每份文件写到一个日期目录下,这样是最简单的方案。

  • 相关阅读:
    【字符集及字符编码】UTF-8、UTF-16和UTF-32
    【Android】SQLite基本用法(转)
    【eclipse】导入/导出开发环境(包括编辑器字体颜色大小等)
    一个Android Socket的例子(转)
    Linux中与Windows对应的InterlockedIncrement()函数
    Linux互斥锁pthread_mutex_t
    C++读写文本文件
    C++回调函数调用Java接口抽象函数
    Overlapped I/O模型--事件通知【摘录自《Windows网络编程》】
    Linux C++中需要的头文件
  • 原文地址:https://www.cnblogs.com/drjava/p/10447014.html
Copyright © 2011-2022 走看看