zoukankan      html  css  js  c++  java
  • filebeat句柄占用问题

    参加这两篇文章比较的经典呀:

    https://developer.aliyun.com/article/637173

    https://blog.csdn.net/weixin_34342578/article/details/89585085

    日志量巨大时filebeat占用文件句柄导致磁盘被打满

    现状:
    采用filebeat→logstash→elasticsearch的流程进行业务日志采集,elk版本为6.3.2。
    生产系统采用Log4j作为日志系统。
    filebeat负责采集log4j输出的业务日志并部署多个节点,单节点单个日志文件test.log大小设置50Mb,日志输出量巨大,几十秒甚至几秒钟就能把50MB打满,然后日志文件被重命名为test.log.1.....test.log.N。由于test.log文件在重命名时filebeat尚未把test.log中的日志全部输送出去(因为此时filebeat data目录中的register文件中还存在对重命名后的文件的索引),导致test.log.1的句柄仍未释放。由此导致当日志量巨大的时候,filebeat来不及处理就会存在大量test.log.N的句柄不释放,而不释放的句柄又会占用大量内存,直至操作系统内存被耗尽。
    分析:
    业务日志量巨大,下游logstash无法满足吞吐要求,会对上游的filebeat产生反压,反压的后果是导致filebeat收集性能急剧降低,register文件中积压大量重命名之后的log文件。
    解决思路:
    1、添加Kafka做日志缓冲
    2、横向扩充logstash节点
    部署Kafka:找了5台配置为CPU16核内存16GB硬盘77GB的机器做Kafka集群(具体部署方式网上一大堆),然后将filebeat的output指向Kafka具体请参考官往https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
    部署完成,创建日志topic并将partitions设置为30,重启启动filebeat,运行一段时间之后register文件中的积压现象不复存在,但是观察Kafka中的订阅情况,发现订阅延迟迅速增加,初步判断为logstash吞吐不够。
    扩充logstash节点:修改logstash的input为Kafka,具体修改方式请参考官网https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
    迅速扩充为3个节点,每个节点开10个线程订阅Kafka中的日志topic。重启logstash,发现Kafka中的延迟现象并未得到缓解,此时怀疑是Kafka分区太少,并发数不够,随即将partitions由30添加至300,并调整每台logstash的线程数为100,观察一段时间之后发现延迟仍未得到缓解。增一度怀疑是logstash性能太差,期间曾出现过几次elasticsearch节点发生OOM而宕机,此时已接近崩溃。后来开始注意到logstash中输出大量的如下INFO日志:
    image
    意思是发送给elasticsearch的请求被拒绝。此时观察elasticsearch,发现存在大量的error:

    2018-11-29T16:39:25,152[o.e.a.b.TransportBulkAction] [data-node3] failed to execute pipeline for a bulk request
    org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]

    1.  
      at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
    2.  
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
    3.  
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
    4.  
      at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
    5.  
      at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
    6.  
      at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
    7.  
      at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
    8.  
      at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
    9.  
      at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
    10.  
      at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
    11.  
      at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
    12.  
      at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
    13.  
      at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
    14.  
      at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
    15.  
      at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
    16.  
      at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
    17.  
      at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
    18.  
      at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
    19.  
      at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
    20.  
      at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
    21.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
    22.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
    23.  
      at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
    24.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
    25.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
    26.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
    27.  
      at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
    28.  
      at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
    29.  
      at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
    30.  
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
    31.  
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
    32.  
      at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
    33.  
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
    34.  
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
    35.  
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

    2018-11-29T16:39:25,161[o.e.x.m.MonitoringService] [data-node3] monitoring execution failed

    org.elasticsearch.xpack.monitoring.exporter.ExportException: Exception when closing export bulk

    1.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1$1.<init>(ExportBulk.java:95) ~[?:?]
    2.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1.onFailure(ExportBulk.java:93) ~[?:?]
    3.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:206) ~[?:?]
    4.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:200) ~[?:?]
    5.  
      at org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:96) ~[?:?]
    6.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:164) ~[?:?]
    7.  
      at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
    8.  
      at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.lambda$doFlush$1(LocalBulk.java:115) ~[?:?]
    9.  
      at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
    10.  
      at org.elasticsearch.action.support.ContextPreservingActionListener.onFailure(ContextPreservingActionListener.java:50) ~[elasticsearch-6.3.2.jar:6.3.2]
    11.  
      at org.elasticsearch.action.support.TransportAction$1.onFailure(TransportAction.java:91) ~[elasticsearch-6.3.2.jar:6.3.2]
    12.  
      at org.elasticsearch.action.bulk.TransportBulkAction.lambda$processBulkIndexIngestRequest$4(TransportBulkAction.java:502) ~[elasticsearch-6.3.2.jar:6.3.2]
    13.  
      at org.elasticsearch.ingest.PipelineExecutionService$1.onFailure(PipelineExecutionService.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
    14.  
      at org.elasticsearch.common.util.concurrent.AbstractRunnable.onRejection(AbstractRunnable.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
    15.  
      at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.onRejection(ThreadContext.java:715) ~[elasticsearch-6.3.2.jar:6.3.2]
    16.  
      at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:104) ~[elasticsearch-6.3.2.jar:6.3.2]
    17.  
      at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
    18.  
      at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
    19.  
      at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
    20.  
      at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
    21.  
      at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
    22.  
      at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
    23.  
      at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
    24.  
      at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
    25.  
      at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
    26.  
      at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
    27.  
      at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
    28.  
      at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
    29.  
      at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
    30.  
      at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
    31.  
      at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
    32.  
      at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
    33.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
    34.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
    35.  
      at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
    36.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
    37.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
    38.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
    39.  
      at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
    40.  
      at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
    41.  
      at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
    42.  
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
    43.  
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
    44.  
      at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
    45.  
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
    46.  
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
    47.  
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

    Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulks

    1.  
      at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:156) ~[?:?]
    2.  
      ... 41 more

    Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulk [default_local]

    ... 40 more

    Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]

    1.  
      at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
    2.  
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
    3.  
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
    4.  
      at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
    5.  
      ... 31 more

    意思也是在拒绝大量请求。才意识到原来瓶颈出现在elasticsearch端,elasticsearch无法响应大批量请求后,对logstash请求进行拒绝。
    后来续持续对elasticsearch进行优化后,Kafka中的堆积延迟消费状况得到缓解。此时已经历了一周的时间。
    具体elasticsearch是如何调优才缓解该问题的会在下一篇博客中进行记录。

    总结:出现filebeat句柄占用导致磁盘空间过大的问题

    本质上就是生产产生的量非常的大,后端的logstash或者es的处理能力无法更上导致的

    因为filebeat是贪婪的,他发现日志因为产生的速度很大,filebeat还没有把这些数据传递给后端的kafaka处理

    这个时候如果手动的把这个文件删除了,但是filebeat还是拥有这个文件的占用量使用

    要模拟这个场景其实比较简单:

    1、第一采集的文件数量必须是在4096行以上,我模拟的时候发现文件数量低于这个数目,好像复现不了

    2、就是filebeat填写一个错误的kafka地址就可以了,这样filebeat的日志一直堆积无法在后端进行处理

    3、在采集的过程中手动将采集的文件删除

    解决的办法就是:

    在filebeat的镜像中设置:

    (2)close_timeout: 3h

           传输了3h后荏没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point;

           注意了,开了这个配置项旨在避免以上案例的问题,但是有丢数据的风险哦。鱼与熊掌不可兼得也,就是之前一直没有发送成功的日志文件会存在丢失情况。

  • 相关阅读:
    Django部署在CENTOS7上
    慕课DJANGO配置
    响应式布局组件介绍
    SYN泛洪攻击原理及防御
    Token,session,cookie
    -webkit-
    JS中dataTransfer对象在拖曳操作中的妙用。
    深入理解DOM节点类型第一篇——12种DOM节点类型概述
    js如何打印object对象
    cookie(2)
  • 原文地址:https://www.cnblogs.com/kebibuluan/p/15665702.html
Copyright © 2011-2022 走看看