package kafka.metrics
package org.apache.kafka.common.metrics
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true) quotaManagers = QuotaFactory.instantiate(config, metrics, time)
public class Metrics implements Closeable { .... .... private final ConcurrentMap<MetricName, KafkaMetric> metrics; private final ConcurrentMap<String, Sensor> sensors;
public interface Metric { /** * A name for this metric */ public MetricName metricName(); /** * The value of the metric */ public double value(); }
@Override public double value() { synchronized (this.lock) { return value(time.milliseconds()); } } double value(long timeMs) { return this.measurable.measure(config, timeMs); }
public interface Measurable { /** * Measure this quantity and return the result as a double * @param config The configuration for this metric * @param now The POSIX time in milliseconds the measurement is being taken * @return The measured value */ public double measure(MetricConfig config, long now); }
metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { return (now - metadata.lastSuccessfulUpdate()) / 1000.0; } });
private final ConcurrentMap<String, Sensor> sensors;
/** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set * of metrics about request sizes such as the average or max. */ public final class Sensor { //一个kafka就只有一个Metrics实例,这个registry就是对这个Metrics的引用 private final Metrics registry; private final String name; private final Sensor[] parents; private final List<Stat> stats; private final List<KafkaMetric> metrics;
这一段的注释很有意义,从注释中可以看到Sensor的作用不同KafkaMetric. KafkaMetric仅仅是返回某一个参数的值,而Sensor有基于某一参数时间序列进行统计的功能,比如平均值,最大值,最小值.那这些统计又是如何实现的呢?答案是List<Stat> stats这个属性成员.
public interface Stat { /** * Record the given value * @param config The configuration to use for this metric * @param value The value to record * @param timeMs The POSIX time in milliseconds this value occurred */ public void record(MetricConfig config, double value, long timeMs); }
public final class Max extends SampledStat { public Max() { super(Double.NEGATIVE_INFINITY); } @Override protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value = Math.max(sample.value, value); } @Override public double combine(List<Sample> samples, MetricConfig config, long now) { double max = Double.NEGATIVE_INFINITY; for (int i = 0; i < samples.size(); i++) max = Math.max(max, samples.get(i).value); return max; } }
public void record(double value, long timeMs) { this.lastRecordTime = timeMs; synchronized (this) { // increment all the stats for (int i = 0; i < this.stats.size(); i++) this.stats.get(i).record(config, value, timeMs); checkQuotas(timeMs); } for (int i = 0; i < parents.length; i++) parents[i].record(value, timeMs); }
public void checkQuotas(long timeMs) { for (int i = 0; i < this.metrics.size(); i++) { KafkaMetric metric = this.metrics.get(i); MetricConfig config = metric.config(); if (config != null) { Quota quota = config.quota(); if (quota != null) { double value = metric.value(timeMs); if (!quota.acceptable(value)) { throw new QuotaViolationException( metric.metricName(), value, quota.bound()); } } } } }
try { clientSensors.quotaSensor.record(value) // trigger the callback immediately if quota is not violated callback(0) } catch { case qve: QuotaViolationException => // Compute the delay val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId)) throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)) clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) }
class ExpireSensorTask implements Runnable { public void run() { for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) { // removeSensor also locks the sensor object. This is fine because synchronized is reentrant // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C. // Calling record on C would cause a record on P as well. // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed, // that will cause C to also get removed. // Since the expiration time is typically high it is not expected to be a significant concern // and thus not necessary to optimize synchronized (sensorEntry.getValue()) { if (sensorEntry.getValue().hasExpired()) { log.debug("Removing expired sensor {}", sensorEntry.getKey()); removeSensor(sensorEntry.getKey()); } } } }
JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
自行搭建了一个kafka集群,只有两个节点。集群中有一个topic(name=default_channel_kafka_zzh_demo),分为5个partition(0 1 2 3 4).
这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中。熟悉kafka的同学知道,kafka有topic和partition这两个概念,topic中根据一定的策略来分为若干个partitions, 这里就以此举例来看,
- 上面所说的offset (集群中的一个topic下的所有partition的LogEndOffset值,即logSize)
- sendCount(集群中的一个topic下的发送总量,这个值是集群中每个broker中此topic的发送量之和)
- sendTps(集群中的一个topic下的TPS, 这个值也是集群中每个broker中此topic的发送量之和)
首先是针对单个kafka broker的。
package kafka.jmx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.io.IOException; import java.net.MalformedURLException; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * Created by hidden on 2016/12/8. */ public class JmxConnection { private static Logger log = LoggerFactory.getLogger(JmxConnection.class); private MBeanServerConnection conn; private String jmxURL; private String ipAndPort = "localhost:9999"; private int port = 9999; private boolean newKafkaVersion = false; public JmxConnection(Boolean newKafkaVersion, String ipAndPort){ this.newKafkaVersion = newKafkaVersion; this.ipAndPort = ipAndPort; } public boolean init(){ jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi"; log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL); try { JMXServiceURL serviceURL = new JMXServiceURL(jmxURL); JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null); conn = connector.getMBeanServerConnection(); if(conn == null){ log.error("get connection return null!"); return false; } } catch (MalformedURLException e) { e.printStackTrace(); return false; } catch (IOException e) { e.printStackTrace(); return false; } return true; } public String getTopicName(String topicName){ String s; if (newKafkaVersion) { s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName; } else { s = ""kafka.server":type="BrokerTopicMetrics",name="" + topicName + "-MessagesInPerSec""; } return s; } /** * @param topicName: topic name, default_channel_kafka_zzh_demo * @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和) */ public long getMsgInCountPerSec(String topicName){ String objectName = getTopicName(topicName); Object val = getAttribute(objectName,"Count"); String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName; if(val !=null){ log.info("{}, Count:{}",debugInfo,(long)val); return (long)val; } return 0; } /** * @param topicName: topic name, default_channel_kafka_zzh_demo * @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。 */ public double getMsgInTpsPerSec(String topicName){ String objectName = getTopicName(topicName); Object val = getAttribute(objectName,"OneMinuteRate"); if(val !=null){ double dVal = ((Double)val).doubleValue(); return dVal; } return 0; } private Object getAttribute(String objName, String objAttr) { ObjectName objectName =null; try { objectName = new ObjectName(objName); } catch (MalformedObjectNameException e) { e.printStackTrace(); return null; } return getAttribute(objectName,objAttr); } private Object getAttribute(ObjectName objName, String objAttr){ if(conn== null) { log.error("jmx connection is null"); return null; } try { return conn.getAttribute(objName,objAttr); } catch (MBeanException e) { e.printStackTrace(); return null; } catch (AttributeNotFoundException e) { e.printStackTrace(); return null; } catch (InstanceNotFoundException e) { e.printStackTrace(); return null; } catch (ReflectionException e) { e.printStackTrace(); return null; } catch (IOException e) { e.printStackTrace(); return null; } } /** * @param topicName * @return 获取topicName中每个partition所对应的logSize(即offset) */ public Map<Integer,Long> getTopicEndOffset(String topicName){ Set<ObjectName> objs = getEndOffsetObjects(topicName); if(objs == null){ return null; } Map<Integer, Long> map = new HashMap<>(); for(ObjectName objName:objs){ int partId = getParId(objName); Object val = getAttribute(objName,"Value"); if(val !=null){ map.put(partId,(Long)val); } } return map; } private int getParId(ObjectName objName){ if(newKafkaVersion){ String s = objName.getKeyProperty("partition"); return Integer.parseInt(s); }else { String s = objName.getKeyProperty("name"); int to = s.lastIndexOf("-LogEndOffset"); String s1 = s.substring(0, to); int from = s1.lastIndexOf("-") + 1; String ss = s.substring(from, to); return Integer.parseInt(ss); } } private Set<ObjectName> getEndOffsetObjects(String topicName){ String objectName; if (newKafkaVersion) { objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*"; }else{ objectName = ""kafka.log":type="Log",name="" + topicName + "-*-LogEndOffset""; } ObjectName objName = null; Set<ObjectName> objectNames = null; try { objName = new ObjectName(objectName); objectNames = conn.queryNames(objName,null); } catch (MalformedObjectNameException e) { e.printStackTrace(); return null; } catch (IOException e) { e.printStackTrace(); return null; } return objectNames; } }
public Map<Integer,Long> getTopicEndOffset(String topicName) public long getMsgInCountPerSec(String topicName) public double getMsgInTpsPerSec(String topicName)
package kafka.jmx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by hidden on 2016/12/8. */ public class JmxMgr { private static Logger log = LoggerFactory.getLogger(JmxMgr.class); private static List<JmxConnection> conns = new ArrayList<>(); public static boolean init(List<String> ipPortList, boolean newKafkaVersion){ for(String ipPort:ipPortList){ log.info("init jmxConnection [{}]",ipPort); JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort); boolean bRet = conn.init(); if(!bRet){ log.error("init jmxConnection error"); return false; } conns.add(conn); } return true; } public static long getMsgInCountPerSec(String topicName){ long val = 0; for(JmxConnection conn:conns){ long temp = conn.getMsgInCountPerSec(topicName); val += temp; } return val; } public static double getMsgInTpsPerSec(String topicName){ double val = 0; for(JmxConnection conn:conns){ double temp = conn.getMsgInTpsPerSec(topicName); val += temp; } return val; } public static Map<Integer, Long> getEndOffset(String topicName){ Map<Integer,Long> map = new HashMap<>(); for(JmxConnection conn:conns){ Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName); if(tmp == null){ log.warn("get topic endoffset return null, topic {}", topicName); continue; } for(Integer parId:tmp.keySet()){//change if bigger if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){ map.put(parId, tmp.get(parId)); } } } return map; } public static void main(String[] args) { List<String> ipPortList = new ArrayList<>(); ipPortList.add("xx.101.130.1:9999"); ipPortList.add("xx.101.130.2:9999"); JmxMgr.init(ipPortList,true); String topicName = "default_channel_kafka_zzh_demo"; System.out.println(getMsgInCountPerSec(topicName)); System.out.println(getMsgInTpsPerSec(topicName)); System.out.println(getEndOffset(topicName)); } }
2016-12-08 19:25:32 -[INFO] - [init jmxConnection [xx.101.130.1:9999]] - [kafka.jmx.JmxMgr:20] 2016-12-08 19:25:32 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35] 2016-12-08 19:25:33 -[INFO] - [init jmxConnection [xx.101.130.2:9999]] - [kafka.jmx.JmxMgr:20] 2016-12-08 19:25:33 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35] 2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:6000] - [kafka.jmx.JmxConnection:73] 2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:4384] - [kafka.jmx.JmxConnection:73] 10384 3.915592283987704E-65 {0=2072, 1=2084, 2=2073, 3=2083, 4=2072}
