Kafka监控-JMX自定义监控以及常用监控工具比较 - Go语言中文社区

Kafka监控-JMX自定义监控以及常用监控工具比较


目录:

一、通过JMX自定义监控

1、jconsole 

2、Java监控代码:

二、Kafka三款监控工具比较(转载)

1、Kafka Web Conslole

2、Kafka Manager

3、KafkaOffsetMonitor

 

一、通过JMX自定义监控

通过JMX监控可以看到的数据有:

  • broker数据指标
  • topic数据指标
  • 每个partition的数据指标
  • consumer消费滞后情况等。

1、jconsole

利用jconsole 工具:(可通过jconsole,找到Mbean对应的指标,鼠标悬浮指标上方就能找到代码查询所需的ObjectName。)

  • 本地直接连接kafka进程

  • 通过远程连接进程:service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi(启动kafka时需开通JMX端口)


这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中 。

所以在本程序中通过Boolean类型的newKafkaVersion来区别对待。

为确定使用者的objectName,可以利用jconsole工具,找到Mbean对应的指标,鼠标悬浮指标上方就能找到代码查询所需的objectName。

 

2、Java代码

(推荐代码原创《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注作者的微信公众号:朱小厮的博客。

启动kafka时,需启动jmx端口:JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties & 

 

JmxMgr.class

package monitor;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

/**

* Created by hidden on 2016/12/8.

*/

public class JmxMgr {

private static Logger log = LoggerFactory.getLogger(JmxMgr.class);

private static List<JmxConnection> conns = new ArrayList<>();

 

public static boolean init(List<String> ipPortList, boolean newKafkaVersion){

for(String ipPort:ipPortList){

log.info("init jmxConnection [{}]",ipPort);

JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);

boolean bRet = conn.init();

if(!bRet){

log.error("init jmxConnection error");

return false;

}

conns.add(conn);

}

return true;

}

 

public static long getMsgInCountPerSec(String topicName){

long val = 0;

for(JmxConnection conn:conns){

long temp = conn.getMsgInCountPerSec(topicName);

val += temp;

}

return val;

}

 

public static double getMsgInTpsPerSec(String topicName){

double val = 0;

for(JmxConnection conn:conns){

double temp = conn.getMsgInTpsPerSec(topicName);

val += temp;

}

return val;

}

 

public static Map<Integer, Long> getEndOffset(String topicName){

Map<Integer,Long> map = new HashMap<>();

for(JmxConnection conn:conns){

Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);

if(tmp == null){

log.warn("get topic endoffset return null, topic {}", topicName);

continue;

}

for(Integer parId:tmp.keySet()){//change if bigger

if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){

map.put(parId, tmp.get(parId));

}

}

}

return map;

}

 

public static void main(String[] args) {

List<String> ipPortList = new ArrayList<>();

ipPortList.add("localhost:9999");

//ipPortList.add("xx.101.130.2:9999");

JmxMgr.init(ipPortList,true);

 

String topicName = "demo2";

System.out.println(getMsgInCountPerSec(topicName));

System.out.println(getMsgInTpsPerSec(topicName));

System.out.println(getEndOffset(topicName));

}

}

JmxConnection.class

package monitor;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import javax.management.*;

import javax.management.remote.JMXConnector;

import javax.management.remote.JMXConnectorFactory;

import javax.management.remote.JMXServiceURL;

import java.io.IOException;

import java.net.MalformedURLException;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

 

/**

* Created by hidden on 2016/12/8.

*/

public class JmxConnection {

private static Logger log = LoggerFactory.getLogger(JmxConnection.class);

 

private MBeanServerConnection conn;

private String jmxURL;

private String ipAndPort = "localhost:9999";

private int port = 9999;

private boolean newKafkaVersion = false;

 

public JmxConnection(Boolean newKafkaVersion, String ipAndPort){

this.newKafkaVersion = newKafkaVersion;

this.ipAndPort = ipAndPort;

}

 

public boolean init(){

jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";

log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);

try {

JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);

JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);

conn = connector.getMBeanServerConnection();

if(conn == null){

log.error("get connection return null!");

return false;

}

} catch (MalformedURLException e) {

e.printStackTrace();

return false;

} catch (IOException e) {

e.printStackTrace();

return false;

}

return true;

}

 

public String getTopicName(String topicName){

String s;

if (newKafkaVersion) {

s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;

} else {

s = ""kafka.server":type="BrokerTopicMetrics",name="" + topicName + "-MessagesInPerSec"";

}

return s;

}

 

/**

* @param topicName: topic name, default_channel_kafka_zzh_demo

* @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和)

*/

public long getMsgInCountPerSec(String topicName){

String objectName = getTopicName(topicName);

Object val = getAttribute(objectName,"Count");

String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;

if(val !=null){

log.info("{}, Count:{}",debugInfo,(long)val);

return (long)val;

}

return 0;

}

 

/**

* @param topicName: topic name, default_channel_kafka_zzh_demo

* @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。

*/

public double getMsgInTpsPerSec(String topicName){

String objectName = getTopicName(topicName);

Object val = getAttribute(objectName,"OneMinuteRate");

if(val !=null){

double dVal = ((Double)val).doubleValue();

return dVal;

}

return 0;

}

 

private Object getAttribute(String objName, String objAttr)

{

ObjectName objectName =null;

try {

objectName = new ObjectName(objName);

} catch (MalformedObjectNameException e) {

e.printStackTrace();

return null;

}

return getAttribute(objectName,objAttr);

}

 

private Object getAttribute(ObjectName objName, String objAttr){

if(conn== null)

{

log.error("jmx connection is null");

return null;

}

 

try {

return conn.getAttribute(objName,objAttr);

} catch (MBeanException e) {

e.printStackTrace();

return null;

} catch (AttributeNotFoundException e) {

e.printStackTrace();

return null;

} catch (InstanceNotFoundException e) {

e.printStackTrace();

return null;

} catch (ReflectionException e) {

e.printStackTrace();

return null;

} catch (IOException e) {

e.printStackTrace();

return null;

}

}

 

/**

* @param topicName

* @return 获取topicName中每个partition所对应的logSize(即offset)

*/

public Map<Integer,Long> getTopicEndOffset(String topicName){

Set<ObjectName> objs = getEndOffsetObjects(topicName);

if(objs == null){

return null;

}

Map<Integer, Long> map = new HashMap<>();

for(ObjectName objName:objs){

int partId = getParId(objName);

Object val = getAttribute(objName,"Value");

if(val !=null){

map.put(partId,(Long)val);

}

}

return map;

}

 

private int getParId(ObjectName objName){

if(newKafkaVersion){

String s = objName.getKeyProperty("partition");

return Integer.parseInt(s);

}else {

String s = objName.getKeyProperty("name");

 

int to = s.lastIndexOf("-LogEndOffset");

String s1 = s.substring(0, to);

int from = s1.lastIndexOf("-") + 1;

 

String ss = s.substring(from, to);

return Integer.parseInt(ss);

}

}

 

private Set<ObjectName> getEndOffsetObjects(String topicName){

String objectName;

if (newKafkaVersion) {

objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";

}else{

objectName = ""kafka.log":type="Log",name="" + topicName + "-*-LogEndOffset"";

}

ObjectName objName = null;

Set<ObjectName> objectNames = null;

try {

objName = new ObjectName(objectName);

objectNames = conn.queryNames(objName,null);

} catch (MalformedObjectNameException e) {

e.printStackTrace();

return null;

} catch (IOException e) {

e.printStackTrace();

return null;

}

 

return objectNames;

}

}

 

二、Kafka三款监控工具比较

(转载自蓝色天堂博客,本文链接地址:http://hadoop1989.com/2015/09/22/Kafka-Monitor_Compare)

之前的博客中,介绍了Kafka Web Console这个监控工具,在生产环境中使用,运行一段时间后,发现该工具会和Kafka生产者、消费者、ZooKeeper建立大量连接,从而导致网络阻塞。并且这个Bug也在其他使用者中出现过,看来使用开源工具要慎重!该Bug暂未得到修复,不得已,只能研究下其他同类的Kafka监控软件。

通过研究,发现主流的三种kafka监控程序分别为:

  1. Kafka Web Conslole
  2. Kafka Manager
  3. KafkaOffsetMonitor

现在依次介绍以上三种工具:

1、Kafka Web Conslole

使用Kafka Web Console,可以监控:

  • Brokers列表、Kafka 集群中 Topic列表,及对应的Partition、LogSize等信息
  • 点击Topic,可以浏览对应的Consumer Groups、Offset、Lag等信息
  • 生产和消费流量图、消息预览…

程序运行后,会定时去读取kafka集群分区的日志长度,读取完毕后,连接没有正常释放,一段时间后产生大量的socket连接,导致网络堵塞。

 

2、Kafka Manager

雅虎开源的Kafka集群管理工具:

  • 管理几个不同的集群
  • 监控集群的状态(topics, brokers, 副本分布, 分区分布)
  • 产生分区分配(Generate partition assignments)基于集群的当前状态
  • 重新分配分区

 

3、KafkaOffsetMonitor

KafkaOffsetMonitor可以实时监控:

  • Kafka集群状态
  • Topic、Consumer Group列表
  • 图形化展示topic和consumer之间的关系
  • 图形化展示consumer的Offset、Lag等信息

总结:通过使用,个人总结以上三种监控程序的优缺点:

  1. Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。
  2. Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。
  3. KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。

 

若只需要监控功能,推荐使用KafkaOffsetMonito,若偏重Kafka集群管理,推荐使用Kafka Manager。

因为都是开源程序,稳定性欠缺。故需先了解清楚目前已存在哪些Bug,多测试一下,避免出现类似于Kafka Web Console的问题。

 

 

重度引用:

如何使用JMX监控Kafka:

https://blog.csdn.net/u013256816/article/details/53524884https://blog.csdn.net/u013256816/article/details/53524884

Kafka三款监控工具比较:

https://www.jianshu.com/p/97ad87a933e1

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_35488412/article/details/88695080
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-02 18:14:47
  • 阅读 ( 1711 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢