解决 flume KafkaSink 启动后cpu占用100%的问题

简介: 解决 flume KafkaSink 启动后cpu占用100%的问题 Flume 版本 :1.6.0-cdh5.5.0问题描述:配置kafkasink,将实时数据发送到kafka。

解决 flume KafkaSink 启动后cpu占用100%的问题

 

Flume 版本 :1.6.0-cdh5.5.0

问题描述:

配置kafkasink,将实时数据发送到kafka

Flume启动完成后,没有日志处理时,cpu使用率飙升到100%

当有日志数据处理时,并发稳定时,cpu不定时会有一瞬间飙升。

当日志数据量比较大时,cpu不会飙升。

发现:

使用 jstack -F <pid> > /home/name/flume-dump.log命令,查看flume的堆栈信息

发现很多BLOCKED信息如下:

Thread 16599: (state = BLOCKED)

 - java.util.concurrent.locks.ReentrantReadWriteLock$FairSync.readerShouldBlock() @bci=1, line=695 (Compiled frame; information may be imprecise)

 - java.util.concurrent.locks.ReentrantReadWriteLock$Sync.tryAcquireShared(int) @bci=33, line=470 (Compiled frame)

 - java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(int) @bci=2, line=1282 (Compiled frame)

 - java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock() @bci=5, line=727 (Compiled frame)

 - org.apache.flume.channel.file.Log.lockShared() @bci=4, line=785 (Compiled frame)

 - org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake() @bci=72, line=501 (Compiled frame)

 - org.apache.flume.channel.BasicTransactionSemantics.take() @bci=51, line=113 (Compiled frame)

 - org.apache.flume.channel.BasicChannelSemantics.take() @bci=26, line=95 (Compiled frame)

 - org.apache.flume.sink.kafka.KafkaSink.process() @bci=57, line=97 (Compiled frame)

 - org.apache.flume.sink.DefaultSinkProcessor.process() @bci=4, line=68 (Compiled frame)

 - org.apache.flume.SinkRunner$PollingRunner.run() @bci=24, line=147 (Compiled frame)

 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)

 

分析:

颜色选中部分可以看到线程的执行过程,走了哪些类,哪些方法。

那就去看一下源码吧。

下载flume-ng-1.6.0-cdh5.5.0-src.tar.gz flume的源码包

使用intellj idea打开。

Ctrl+n 搜索第一个类org.apache.flume.SinkRunner

点击 scroll from source

 

定位到这个类,发现是在flume-ng-core包中

接着看这个类中的内部类PollingRunner的run方法

 

这个类执行policy.process,policy的类型时SinkProcessor

,是所有sink的顶层接口,用来执行所有sink的process方法。

While循环中根据process方法返回的sink Status,判断当前channel中的event是否处理完毕。

如果处理完了,当前sink sleep。

如果这个sink没有返回BACKOFF,会一直while死循环的执行policy.process()方法。并判断其返回的状态。

Cpu占用率100%极有可能时死循环造成的,带着这个猜想,我看了一下process的实现类。

DefaultSinkProcessor类的process方法如下

 

那sink时哪个呢?上面的栈信息提示是KafkaSink。来看这个sink的process方法:

@Override
public Status process() throws EventDeliveryException {

// 一开始设置了statusready
  Status result = Status.READY;
  Channel channel = getChannel();
  Transaction transaction = null;
  Event event = null;
  String eventTopic = null;
  String eventKey = null;

  try {
    long processedEvents = 0;

    transaction = channel.getTransaction();
    transaction.begin();

    messageList.clear();
    for (; processedEvents < batchSize; processedEvents += 1) {

// channel获取event
      event = channel.take();

      if (event == null) {
        // no events available in channel

// 我们flume一启动直接飙升,当前肯定是没有日志的。所以执行到这里 // for退出
        
break;
      }

      byte[] eventBody = event.getBody();
      Map<String, String> headers = event.getHeaders();

      if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
        eventTopic = topic;
      }

      eventKey = headers.get(KEY_HDR);

      if (logger.isDebugEnabled()) {
        logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
          
+ new String(eventBody, "UTF-8"));
        logger.debug("event #{}", processedEvents);
      }

      // create a message and add to buffer
      
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
        (eventTopic, eventKey, eventBody);
      messageList.add(data);

    }
    //  break后到这里。Eventnull,而且我们并没有日志写进来,处理的event

    // 数一定是0,下面的if也不会进
    // publish batch and commit.
    
if (processedEvents > 0) {
      long startTime = System.nanoTime();
      producer.send(messageList);
      long endTime = System.nanoTime();
      counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
      counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
    }
   // 出来if直接事务提交了。肯定也是null,到目前位置没有看到status的改变
    transaction.commit();

  } catch (Exception ex) {
    String errorMsg = "Failed to publish events";
    logger.error("Failed to publish events", ex);
    result = Status.BACKOFF;
    if (transaction != null) {
      try {
        transaction.rollback();
        counter.incrementRollbackCount();
      } catch (Exception e) {
        logger.error("Transaction rollback failed", e);
        throw Throwables.propagate(e);
      }
    }
    throw new EventDeliveryException(errorMsg, ex);
  } finally {
    if (transaction != null) {
      transaction.close();
    }
  }
//  这里竟然直接返回了。。Channel是空的时候返回了。除了判断什么都没做!!!
  return result;
}

 

可以看到,KafkaSink的process的实现,除了第一行代码设置了status为reday。之后就没有对状态进行改变,而PollingRunner的run方法是根据这个status判断当前sink是否需要sleep。Channel中没有event需要处理,当然要sleep啊,不然就是死循环了。只有在有数据的时候,处理数据才不会对cpu造成太大的压力。

这就解释了开头说的数据量打的时候cpu占用并不会太高的原因。

为了再确认一下,这个思路是不是正确的,再看一下flume实现的其他sink。再event为null的时候是怎么处理的。

HDFSEventSink的process方法如下

 

在commit后,判断了txnEventCount(for循环的计数器,循环一次说明处理了一个event)数。如果小于1(表示没有event),返回了BACKOFF。

IrcSink的处理方式:

可以看到这几个sink再event为null的时候,都将status设置为了BACKOFF

 

解决方法:

这个问题再flume的1.7版本中已经解决了。

1.7中的KafkaSink是这样做:


if (event == null) {
    // no events available in channel
    
if(processedEvents == 0) {
        result = Status.BACKOFF;
        counter.incrementBatchEmptyCount();
    } else {
        counter.incrementBatchUnderflowCount();
    }
    break;
}

 

目录
相关文章
|
7月前
|
分布式计算 安全 Hadoop
HBase启动时有进程webUI不显示HRegionServer各种情况解决方案
HBase启动时有进程webUI不显示HRegionServer各种情况解决方案
228 0
|
2月前
|
SQL Java Apache
Flink CPU问题之CPU较低如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
分布式计算 Hadoop
hadoop datanode进程不能启动
hadoop datanode进程不能启动
271 0
hadoop datanode进程不能启动
|
资源调度 调度 容器
Hadoop3.0Yarn添加网络、磁盘IO等资源资料汇总及实战配置遇到的问题和解决办法
Hadoop3.0Yarn添加网络、磁盘IO等资源资料汇总及实战配置遇到的问题和解决办法
175 0
|
资源调度 分布式计算 Hadoop
【Hadoop】万字长文详解Yarn资源隔离
【Hadoop】万字长文详解Yarn资源隔离
397 0
【Hadoop】万字长文详解Yarn资源隔离
|
资源调度 分布式计算 Java
MapReduce作业在YARN的内存分配设置
MapReduce作业在YARN的内存分配设置
313 0
MapReduce作业在YARN的内存分配设置
|
资源调度 分布式计算 Java
YARN and MapReduce的【内存】优化配置详解
在Hadoop2.x中, YARN负责管理MapReduce中的资源(内存, CPU等)并且将其打包成Container。 使之专注于其擅长的数据处理任务, 将无需考虑资源调度.
1433 0
|
存储 Java
Hadoop-NameNode内存预估
NameNode通过NetworkTopology维护整个集群的树状拓扑结构;拓扑结构的叶子节点DatanodeDescriptor是标识DataNode的关键结构。DataNode节点一般会挂载多块不同类型存储单元;StorageMap描述的正是存储介质DatanodeStorageInfo集合(Map默认长度16)。
181 0
|
SQL 资源调度 测试技术
YARN ResourceManager重启作业保留机制
YARN可以通过相关配置支持ResourceManager重启过程中,不影响正在运行的作业,即重启后,作业还能正常继续运行直到结束
7267 0
|
缓存 分布式计算 Spark

热门文章

最新文章