日志量巨大时filebeat占用文件句柄导致磁盘被打满

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 生产环境日志收集集群的一次优化经历

现状:
采用filebeat→logstash→elasticsearch的流程进行业务日志采集,elk版本为6.3.2。
生产系统采用Log4j作为日志系统。
filebeat负责采集log4j输出的业务日志并部署多个节点,单节点单个日志文件test.log大小设置50Mb,日志输出量巨大,几十秒甚至几秒钟就能把50MB打满,然后日志文件被重命名为test.log.1.....test.log.N。由于test.log文件在重命名时filebeat尚未把test.log中的日志全部输送出去(因为此时filebeat data目录中的register文件中还存在对重命名后的文件的索引),导致test.log.1的句柄仍未释放。由此导致当日志量巨大的时候,filebeat来不及处理就会存在大量test.log.N的句柄不释放,而不释放的句柄又会占用大量内存,直至操作系统内存被耗尽。
分析:
业务日志量巨大,下游logstash无法满足吞吐要求,会对上游的filebeat产生反压,反压的后果是导致filebeat收集性能急剧降低,register文件中积压大量重命名之后的log文件。
解决思路:
1、添加Kafka做日志缓冲
2、横向扩充logstash节点
部署Kafka:找了5台配置为CPU16核内存16GB硬盘77GB的机器做Kafka集群(具体部署方式网上一大堆),然后将filebeat的output指向Kafka具体请参考官往https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
部署完成,创建日志topic并将partitions设置为30,重启启动filebeat,运行一段时间之后register文件中的积压现象不复存在,但是观察Kafka中的订阅情况,发现订阅延迟迅速增加,初步判断为logstash吞吐不够。
扩充logstash节点:修改logstash的input为Kafka,具体修改方式请参考官网https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
迅速扩充为3个节点,每个节点开10个线程订阅Kafka中的日志topic。重启logstash,发现Kafka中的延迟现象并未得到缓解,此时怀疑是Kafka分区太少,并发数不够,随即将partitions由30添加至300,并调整每台logstash的线程数为100,观察一段时间之后发现延迟仍未得到缓解。增一度怀疑是logstash性能太差,期间曾出现过几次elasticsearch节点发生OOM而宕机,此时已接近崩溃。后来开始注意到logstash中输出大量的如下INFO日志:
image
意思是发送给elasticsearch的请求被拒绝。此时观察elasticsearch,发现存在大量的error:

2018-11-29T16:39:25,152[o.e.a.b.TransportBulkAction] [data-node3] failed to execute pipeline for a bulk request
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]

at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

2018-11-29T16:39:25,161[o.e.x.m.MonitoringService] [data-node3] monitoring execution failed

org.elasticsearch.xpack.monitoring.exporter.ExportException: Exception when closing export bulk

at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1$1.<init>(ExportBulk.java:95) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1.onFailure(ExportBulk.java:93) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:206) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:200) ~[?:?]
at org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:96) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:164) ~[?:?]
at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.lambda$doFlush$1(LocalBulk.java:115) ~[?:?]
at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.ContextPreservingActionListener.onFailure(ContextPreservingActionListener.java:50) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction$1.onFailure(TransportAction.java:91) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.lambda$processBulkIndexIngestRequest$4(TransportBulkAction.java:502) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.ingest.PipelineExecutionService$1.onFailure(PipelineExecutionService.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.onRejection(AbstractRunnable.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.onRejection(ThreadContext.java:715) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:104) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulks

at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:156) ~[?:?]
... 41 more

Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulk [default_local]

... 40 more

Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]

at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
... 31 more

意思也是在拒绝大量请求。才意识到原来瓶颈出现在elasticsearch端,elasticsearch无法响应大批量请求后,对logstash请求进行拒绝。
后来续持续对elasticsearch进行优化后,Kafka中的堆积延迟消费状况得到缓解。此时已经历了一周的时间。
具体elasticsearch是如何调优才缓解该问题的会在下一篇博客中进行记录。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1月前
|
存储 JSON 监控
可以通过配置Filebeat来将Higress日志持久化到磁盘
【2月更文挑战第10天】可以通过配置Filebeat来将Higress日志持久化到磁盘
27 4
|
3月前
|
缓存 固态存储 关系型数据库
MySQL性能优化指南:深入分析重做日志刷新到磁盘的机制
MySQL性能优化指南:深入分析重做日志刷新到磁盘的机制
|
4月前
|
缓存 关系型数据库 MySQL
MySQL Binlog--事务日志和BINLOG落盘参数对磁盘IO的影响
MySQL Binlog--事务日志和BINLOG落盘参数对磁盘IO的影响
44 0
|
7月前
|
存储 数据采集 安全
通过filebeat、logstash、rsyslog采集nginx日志的几种方式
由于nginx功能强大,性能突出,越来越多的web应用采用nginx作为http和反向代理的web服务器。而nginx的访问日志不管是做用户行为分析还是安全分析都是非常重要的数据源之一。如何有效便捷的采集nginx的日志进行有效的分析成为大家关注的问题。本文通过几个实例来介绍如何通过filebeat、logstash、rsyslog采集nginx的访问日志和错误日志。
257 0
|
6月前
|
存储 NoSQL Redis
容器部署日志分析平台ELK7.10.1(Elasisearch+Filebeat+Redis+Logstash+Kibana)
容器部署日志分析平台ELK7.10.1(Elasisearch+Filebeat+Redis+Logstash+Kibana)
125 0
|
6月前
|
NoSQL Redis 索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
|
2月前
|
文字识别 运维 Oracle
asm 磁盘故障处理日志
asm 磁盘故障处理日志
23 2
|
3月前
|
存储 JSON 运维
【运维】Powershell 服务器系统管理信息总结(进程、线程、磁盘、内存、网络、CPU、持续运行时间、系统账户、日志事件)
【运维】Powershell 服务器系统管理信息总结(进程、线程、磁盘、内存、网络、CPU、持续运行时间、系统账户、日志事件)
49 0
|
7月前
|
Java
Filebeat日志采集器实例 1
Filebeat日志采集器实例
74 1
|
3月前
|
监控 NoSQL Redis
ELK7.x日志系统搭建 3. 采用轻量级日志收集Filebeat
ELK7.x日志系统搭建 3. 采用轻量级日志收集Filebeat
103 0

热门文章

最新文章