zoukankan      html  css  js  c++  java
  • 用logstash 作数据的聚合统计

    用logstash 作数据的聚合统计

    以spark-streaming 处理消费数据,统计日志经spark sql存储在mysql中

    日志写入方式为append
    val wordsDataFrame = rdd.toDF("supplier", "type", "domain", "pdate", "count", "idate")
    wordsDataFrame
    .write
    .format("jdbc")
    .mode("append").options(mysql_conf)
    .save()
    因为spark-streaming是批处理服务,这些日志只是中间结果,会有大量小批次的统计信息
    表名statistics_temp
    id(自增id) domain idate count
    1 www.baidu.com 2018-06-13 1
    2 www.baidu.com 2018-06-13 2
    3 www.taobao.com 2018-06-13 3

    求每日每domain的数据还需再一次作聚合

    原本的构想是通过logstash将日志导入es,elk方案,kibana直接划分时间间隔聚合出结果展示

    现在有个需求是要存入每日的量级入一张表,直接在mysql展示

    不考虑从kibana聚合再写入es,statistics_temp数据量较大,每次查询都从statistics_temp用聚合显然不可行

    因此需要由这张临时表,导出完真正的有效表statistics,这种需求很常见,通常是记录时间点,定时执行,读取时间点后的数据,统计再upsert

    导出表名statistics(domain+idate 配置联合唯一索引,DUPLICATE才生效)
    id(自增id) domain idate count
    1 www.baidu.com 2018-06-13 3
    3 www.taobao.com 2018-06-13 3

    最近作数据流程设计搭建,不想写这种代码,因为对logstash比较熟悉,因此便想试试logstash的方案,最终实验可行

    环境
    logstash 5.5
    mysql 5.7

    聚合 需要官方插件
    https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html#plugins-filters-aggregate-example2
    写入 mysql需要第三方插件
    https://github.com/theangryangel/logstash-output-jdbc

    1安装插件
    logstash5.5 默认不含logstash-filters-aggregate 需单独安装,其他版本未知
    logstash-plugins install logstash-filters-aggregate
    第三方插件,默认不含,需单独安装
    logstash-plugins install logstash-output-jdbc

    2执行
    由于jdbc_fetch_size参数不生效,因此会拿出大量数据(mysql-connector-java-5.1.44-bin.jar,默认的数据量)需要调整堆大小,避免oom

    export LS_JAVA_OPTS="-Xms4g -Xmx8g"
    logstash -f /logstash/statistic.conf

    重点是/logstash/statistic.conf的内容

    input {
    jdbc {
    jdbc_driver_library => "/logstash/mysql-connector-java-5.1.44-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://host:3306/db1"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "* * * * *"
    use_column_value => true
    tracking_column => "id"
    jdbc_fetch_size => 2000
    tracking_column_type => "numeric"
    statement => "SELECT * from statistics WHERE id > :sql_last_value"
    clean_run => false
    record_last_run => true
    last_run_metadata_path => "/home/logstash/.statistic"
    }
    }

    filter {
    aggregate {
    task_id => "%{domain}_%{idate}"
    code => "
    map['domain'] = event.get('domain')
    map['idate'] = event.get('idate')
    map['count'] ||= 0
    map['count'] += event.get('count')
    event.cancel()
    "
    push_previous_map_as_event => true
    }
    }

    output {
    stdout{
    codec=>rubydebug{}
    }
    jdbc {
    driver_jar_path => "/logstash/mysql-connector-java-5.1.44-bin.jar"
    driver_class => "com.mysql.jdbc.Driver"
    connection_string => "jdbc:mysql://host:3306/db1?user=root&password=root"
    statement => [ "INSERT INTO `bbs`.`statistics2` (`domain`,`idate`,`count`) VALUES(?,?,?) ON DUPLICATE KEY UPDATE `count` = `count` + ?;", "domain","idate","count","count" ]
    }
    }


    tracking_column => "id"
    tracking_column_type => "numeric"
    statement => "SELECT * from statistics WHERE id > :sql_last_value"
    last_run_metadata_path => "/home/logstash/.statistic"

    以上四个参数,配置记录点的信息

    该例子为自增id numeric类型
    记录文件示例
    bash-4.3# cat /home/logstash/.statistic
    --- 127986843

    若是时间类型
    tracking_column => "updated_on"
    tracking_column_type => "timestamp"
    statement => "SELECT * from user WHERE updated_on > :sql_last_value"
    last_run_metadata_path => "/home/logstash/.statistic"

    记录文件示例
    bash-4.3# cat /home/logstash/.statistic
    --- 2018-06-13 10:57:59.000000000 +00:00

    可以通过,停logstash服务,改记录文件,再启动logstash服务,来重新聚合统计某记录点后的数据。


    示例
    count 对比
    statistics_temp:
    mysql> select count(count),idate from statistics_temp where domain='www.abc.com' and idate>'2018-06-15' group by idate;
    +--------------+------------+
    | count(count) | idate |
    +--------------+------------+
    | 15290 | 2018-06-16 |
    | 27176 | 2018-06-17 |
    | 18997 | 2018-06-18 |
    | 21785 | 2018-06-19 |
    +--------------+------------+
    statistics:
    mysql> select count(count),idate from statistics where domain='www.abc.com' and idate>'2018-06-15' group by idate;
    +--------------+------------+
    | count(count) | idate |
    +--------------+------------+
    | 532 | 2018-06-16 |
    | 533 | 2018-06-17 |
    | 534 | 2018-06-18 |
    | 535 | 2018-06-19 |
    +--------------+------------+

    sum 对比
    statistics_temp
    mysql> select sum(count),idate from statistics_temp where domain='www.abc.com' and idate>'2018-06-15' group by idate;
    +------------+------------+
    | sum(count) | idate |
    +------------+------------+
    | 390499 | 2018-06-16 |
    | 705807 | 2018-06-17 |
    | 462147 | 2018-06-18 |
    | 600657 | 2018-06-19 |
    +------------+------------+
    statistics
    mysql> select sum(count),idate from statistics where domain='www.abc.com' and idate>'2018-06-15' group by idate;
    +------------+------------+
    | sum(count) | idate |
    +------------+------------+
    | 390499 | 2018-06-16 |
    | 705807 | 2018-06-17 |
    | 462147 | 2018-06-18 |
    | 600657 | 2018-06-19 |
    +------------+------------+

    相比statistics_temp,statistics count减少,sum一致,聚合成功

  • 相关阅读:
    作业
    bash陷阱
    Hive
    工作小结
    Python脚本没有实时print信息
    Ubuntu下apt-get遇到Hash Sum Mismatch
    微信小程序要2017-01-19号发布!
    flex布局学习(四)flex色子布局练习
    flex布局学习(三)
    flex布局学习(二)
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/9200599.html
Copyright © 2011-2022 走看看