使用JMX监控Kafka

简介: JMX监控Kafka
http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool

 

使用JMX监控Kafka

标签: KafkaJMX监控
  2952人阅读  评论(3)  收藏  举报
category_icon.jpg  分类:
Kafka(8)  arrow_triangle%20_down.jpg

目录(?)[+]

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.
关于监控指标的描述,可以参考: http://kafka.apache.org/documentation.html#monitoring

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
[html]  view plain  copy
 print ?
  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then  
  2.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"  
  3.     export JMX_PORT="9999"  
  4. fi  


通过Jconsole测试时候可以连接





通过JavaAPI来访问


通过以下方法获取目标值
[java]  view plain  copy
 print ? 在CODE上查看代码片 派生到我的代码片
  1. public class KafkaDataProvider{  
  2.     protected final Logger LOGGER = LoggerFactory.getLogger(getClass());  
  3.     private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";  
  4.     private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";  
  5.     private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";  
  6.     private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";  
  7.     private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";  
  8.     private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";  
  9.     private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";  
  10.     private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";  
  11.     public String extractMonitorData() {  
  12.         //TODO 通过调用API获得IP以及参数  
  13.         KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();  
  14.         String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";  
  15.         try {  
  16.             MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);  
  17.             ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);  
  18.             ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);  
  19.             ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);  
  20.             ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);  
  21.             ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);  
  22.             ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);  
  23.             ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);  
  24.             ObjectName partCountObj = new ObjectName(PART_COUNT);  
  25.             Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");  
  26.             Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");  
  27.             Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");  
  28.             Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");  
  29.             Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");  
  30.             Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");  
  31.             Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");  
  32.             Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");  
  33.             monitorDataPoint.setMessagesInPerSec(messagesInPerSec);  
  34.             monitorDataPoint.setBytesInPerSec(bytesInPerSec);  
  35.             monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);  
  36.             monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);  
  37.             monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);  
  38.             monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);  
  39.             monitorDataPoint.setActiveControllerCount(activeControllerCount);  
  40.             monitorDataPoint.setPartCount(partCount);  
  41.         } catch (IOException e) {  
  42.             e.printStackTrace();  
  43.         } catch (MalformedObjectNameException e) {  
  44.             e.printStackTrace();  
  45.         } catch (AttributeNotFoundException e) {  
  46.             e.printStackTrace();  
  47.         } catch (MBeanException e) {  
  48.             e.printStackTrace();  
  49.         } catch (ReflectionException e) {  
  50.             e.printStackTrace();  
  51.         } catch (InstanceNotFoundException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return monitorDataPoint.toString();  
  55.     }  
  56.     public static void main(String[] args) {  
  57.         System.out.println(new KafkaDataProvider().extractMonitorData());  
  58.     }  
  59.     /** 
  60.      * 获得MBeanServer 的连接 
  61.      * 
  62.      * @param jmxUrl 
  63.      * @return 
  64.      * @throws IOException 
  65.      */  
  66.     public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {  
  67.         JMXServiceURL url = new JMXServiceURL(jmxUrl);  
  68.         JMXConnector jmxc = JMXConnectorFactory.connect(url, null);  
  69.         MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();  
  70.         return mbsc;  
  71.     }  
  72. }  

其他工具

除了自己编写定制化的监控程序外


kafka-web-console

https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

KafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

Mx4jLoader

目录
相关文章
|
4月前
|
消息中间件 SQL 监控
kafka监控
kafka监控
27 0
|
消息中间件 监控 Java
Docker安装Kafka(docker-compose)、EFAK监控
Docker安装Kafka(docker-compose)、EFAK监控
Docker安装Kafka(docker-compose)、EFAK监控
|
消息中间件 Prometheus Kubernetes
K8S的Kafka监控(Prometheus+Grafana)
本文实战操作如何在K8S环境对kafka做监控(Prometheus+Grafana)
567 0
K8S的Kafka监控(Prometheus+Grafana)
|
10月前
|
消息中间件 存储 监控
【Kafka从入门到放弃系列 七】Kafka可视化监控
【Kafka从入门到放弃系列 七】Kafka可视化监控
309 0
BXA
|
11月前
|
消息中间件 存储 Prometheus
Kafka运维与监控
Kafka是由Apache Software Foundation开发的一款分布式流处理平台和消息队列系统 可以处理大规模的实时数据流,具有高吞吐量、低延迟、持久性和可扩展性等优点 常用于数据架构、数据管道、日志聚合、事件驱动等场景,对Kafka的运维和监控十分必要 本文旨在介绍Kafka的运维和监控相关内容
BXA
244 0
|
消息中间件 Prometheus 运维
最佳实践|从Producer 到 Consumer,如何有效监控 Kafka
对于运维人而言,如何安装维护一套监控系统,或如何进行技术选型,从来不是工作重点。如何借助工具对所需的应用、组件进行监控,发现并解决问题才是重中之重。随着 Prometheus 逐渐成为云原生时代可观测标准,为了帮助更多运维人用好 Prometheus,阿里云云原生团队将定期更新 Prometheus 最佳实践系列。第一期我们讲解了《最佳实践|Spring Boot 应用如何接入 Prometheus 监控》,今天将为大家带来,消息队列产品 Kafka 的监控最佳实践。
304 0
最佳实践|从Producer 到  Consumer,如何有效监控 Kafka
|
消息中间件 监控 Kafka
Flume监控文件并将数据输入至Kafka
Flume监控文件并将数据输入至Kafka
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
422 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
43 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
242 0