zoukankan      html  css  js  c++  java
  • Kafka monitoring监控

    一、Metrics

    kafka有两个metrics包,在看源码的时候很容易混淆

    package kafka.metrics
    package org.apache.kafka.common.metrics

    可以看到这两个包的包名都是metrics,但是他们负责的任务并不相同,而且两个包中的类并没有任何的互相引用关系.可以看作是两个完全独立的包.kafka.mtrics这个包,主要调用yammer的Api,并进行封装,提供给client监测kafka的各个性能参数。

    而commons.metrics这个包是我这篇文章主要要介绍的,这个包并不是面向client提供服务的,他是为了给kafka中的其他组件,比如replicaManager,PartitionManager,QuatoManager提供调用,让这些Manager了解kafka现在的运行状况,以便作出相应决策的. 

    首先metrics第一次被初始化,在kafkaServer的startup()方法中

    metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
    quotaManagers = QuotaFactory.instantiate(config, metrics, time)

    初始化了一个Metrics,并将这个实例传到quotaManagers的构造函数中,这里简单介绍一下quotaManagers.这是kafka中用来限制kafka,producer的传输速度的,比如在config文件下设置producer不能以超过5MB/S的速度传输数据,那么这个限制就是通过quotaManager来实现的.

    回到metrics上,跟进代码.

    public class Metrics implements Closeable {
     ....
     ....
        private final ConcurrentMap<MetricName, KafkaMetric> metrics;
        private final ConcurrentMap<String, Sensor> sensors;

    metrics与sensors这两个concurrentMap是Metrics中两个重要的成员属性.那么什么是KafkaMetric,什么是Sensor呢?

    首先分析KafkaMetric

    KafkaMetric实现了Metric接口,可以看到它的核心方法value()返回要监控的参数的值.

    public interface Metric {
    
        /**
         * A name for this metric
         */
        public MetricName metricName();
    
        /**
         * The value of the metric
         */
        public double value();
    
    }

    那么KafkaMetric又是如何实现value()方法的呢?

    @Override
    public double value() {
        synchronized (this.lock) {
            return value(time.milliseconds());
        }
    }
    
    double value(long timeMs) {
        return this.measurable.measure(config, timeMs);
    }

    原来value()是通过kafkaMetric中的另一个成员属性measurable完成

    public interface Measurable {
    
        /**
         * Measure this quantity and return the result as a double
         * @param config The configuration for this metric
         * @param now The POSIX time in milliseconds the measurement is being taken
         * @return The measured value
         */
        public double measure(MetricConfig config, long now);
    
    }

    其实这边挺绕的,Metrics有kafkaMetric的成员变量,而kafkaMetric又通过Measurable返回要检测的值.打个比方,Metrics好比是汽车的仪表盘,kafkaMetric就是仪表盘上的一个仪表,Measurable就是对真正要检测的组件的一个封装.来看看一个Measrable的简单实现,在sender.java类中.

    metrics.addMetric(m, new Measurable() {
        public double measure(MetricConfig config, long now) {
            return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
        }
    });

    可以看到measure的实现就是简单地返回要返回的值,因为是直接在目标类中定义的,所以可以直接获得相应变量的引用.

    接下来介绍Sensor,也就是下面的ConcurrentMap中的Sensor

    private final ConcurrentMap<String, Sensor> sensors;

    以下是Sensor类的源码

    /**
     * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
     * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
     * of metrics about request sizes such as the average or max.
     */
    public final class Sensor {
        //一个kafka就只有一个Metrics实例,这个registry就是对这个Metrics的引用
        private final Metrics registry;
        private final String name;
        private final Sensor[] parents;
        private final List<Stat> stats;
        private final List<KafkaMetric> metrics;

    这一段的注释很有意义,从注释中可以看到Sensor的作用不同KafkaMetric. KafkaMetric仅仅是返回某一个参数的值,而Sensor有基于某一参数时间序列进行统计的功能,比如平均值,最大值,最小值.那这些统计又是如何实现的呢?答案是List<Stat> stats这个属性成员.

    public interface Stat {
    
        /**
         * Record the given value
         * @param config The configuration to use for this metric
         * @param value The value to record
         * @param timeMs The POSIX time in milliseconds this value occurred
         */
        public void record(MetricConfig config, double value, long timeMs);
    
    }

    可以看到Stat是一个接口,其中有一个record方法可以记录一个采样数值,下面看一个例子,max这个功能如何用Stat来实现?

    public final class Max extends SampledStat {
    
        public Max() {
            super(Double.NEGATIVE_INFINITY);
        }
    
        @Override
        protected void update(Sample sample, MetricConfig config, double value, long now) {
            sample.value = Math.max(sample.value, value);
        }
    
        @Override
        public double combine(List<Sample> samples, MetricConfig config, long now) {
            double max = Double.NEGATIVE_INFINITY;
            for (int i = 0; i < samples.size(); i++)
                max = Math.max(max, samples.get(i).value);
            return max;
        }
    
    }

    是不是很简单,update相当于冒一次泡,把当前的值与历史的最大值比较.combine相当于用一次完整的冒泡排序找出最大值,需要注意的是,max是继承SampleStat的,而SampleStat是Stat接口的实现类.那我们回到Sensor类上来.

    public void record(double value, long timeMs) {
        this.lastRecordTime = timeMs;
        synchronized (this) {
            // increment all the stats
            for (int i = 0; i < this.stats.size(); i++)
                this.stats.get(i).record(config, value, timeMs);
            checkQuotas(timeMs);
        }
        for (int i = 0; i < parents.length; i++)
            parents[i].record(value, timeMs);
    }

    record方法,每个注册于其中的stats提交值,同时如果自己有父sensor的话,向父sensor提交.

    public void checkQuotas(long timeMs) {
        for (int i = 0; i < this.metrics.size(); i++) {
            KafkaMetric metric = this.metrics.get(i);
            MetricConfig config = metric.config();
            if (config != null) {
                Quota quota = config.quota();
                if (quota != null) {
                    double value = metric.value(timeMs);
                    if (!quota.acceptable(value)) {
                        throw new QuotaViolationException(
                            metric.metricName(),
                            value,
                            quota.bound());
                    }
                }
            }
        }
    }

    checkQuotas,通过这里其实是遍历注册在sensor上的每一个KafkaMetric来检查他们的值有没有超过config文件中设置的配额.注意这里的QuotaVioLationException,是不是很熟悉.在QuatoManager中,如果有一个client的上传/下载速度超过指定配额.那么就会抛出这个警告.

    try {
      clientSensors.quotaSensor.record(value)
      // trigger the callback immediately if quota is not violated
      callback(0)
    } catch {
      case qve: QuotaViolationException =>
        // Compute the delay
        val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
        clientSensors.throttleTimeSensor.record(throttleTimeMs)
        // If delayed, add the element to the delayQueue
        delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
        delayQueueSensor.record()
        logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
    }

    这里就很好理解了,向clientSensor提交上传,下载的值,如果成功了,就掉用相应的callback,如果失败了catch的就是QuotaViolationException.

    其实metrics的运行模型还是很简单的,让人感觉绕的就是,各种抽象,Metrics,KafkaMetrics,Sensor,Stat这些概念吧.

    最后,Sensor会初始化一个线程专门用来清除长时间没有使用的线程.这个线程名为"SensorExpiryThread"

    class ExpireSensorTask implements Runnable {
        public void run() {
            for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
                // removeSensor also locks the sensor object. This is fine because synchronized is reentrant
                // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
                // Calling record on C would cause a record on P as well.
                // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
                // that will cause C to also get removed.
                // Since the expiration time is typically high it is not expected to be a significant concern
                // and thus not necessary to optimize
                synchronized (sensorEntry.getValue()) {
                    if (sensorEntry.getValue().hasExpired()) {
                        log.debug("Removing expired sensor {}", sensorEntry.getKey());
                        removeSensor(sensorEntry.getKey());
                    }
                }
            }
        }

    二、JMX

    本博文通过使用jmx调用kafka的几个监测项属性来讲述下如何使用jmx来监控kafka.
    有关Jmx的使用可以参考:

    在使用jmx之前需要确保kafka开启了jmx监控,kafka启动时要添加JMX_PORT=9999这一项,也就是:

    JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &

    自行搭建了一个kafka集群,只有两个节点。集群中有一个topic(name=default_channel_kafka_zzh_demo),分为5个partition(0 1 2 3 4).

    这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中。熟悉kafka的同学知道,kafka有topic和partition这两个概念,topic中根据一定的策略来分为若干个partitions, 这里就以此举例来看,
    在0.8.1.x中有关此项的属性的ObjectName(String值)为:
    “kafka.log”:type=”Log”,name=”default_channel_kafka_zzh_demo-*-LogEndOffset”

    而在0.8.2.x中有关的属性的ObjectName为:
    kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0

    所以在程序中要区别对待。

    这里采用三个监测项来演示如果使用jmx进行监控:

    1. 上面所说的offset (集群中的一个topic下的所有partition的LogEndOffset值,即logSize)
    2. sendCount(集群中的一个topic下的发送总量,这个值是集群中每个broker中此topic的发送量之和)
    3. sendTps(集群中的一个topic下的TPS, 这个值也是集群中每个broker中此topic的发送量之和)

    首先是针对单个kafka broker的。

    package kafka.jmx;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.management.*;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    import java.io.IOException;
    import java.net.MalformedURLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * Created by hidden on 2016/12/8.
     */
    public class JmxConnection {
        private static Logger log = LoggerFactory.getLogger(JmxConnection.class);
    
        private MBeanServerConnection conn;
        private String jmxURL;
        private String ipAndPort = "localhost:9999";
        private int port = 9999;
        private boolean newKafkaVersion = false;
    
        public JmxConnection(Boolean newKafkaVersion, String ipAndPort){
            this.newKafkaVersion = newKafkaVersion;
            this.ipAndPort = ipAndPort;
        }
    
        public boolean init(){
            jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
            log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
            try {
                JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
                JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
                conn = connector.getMBeanServerConnection();
                if(conn == null){
                   log.error("get connection return null!");
                    return  false;
                }
            } catch (MalformedURLException e) {
                e.printStackTrace();
                return false;
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    
        public String getTopicName(String topicName){
            String s;
            if (newKafkaVersion) {
                s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;
            } else {
                s = ""kafka.server":type="BrokerTopicMetrics",name="" + topicName + "-MessagesInPerSec"";
            }
            return s;
        }
    
        /**
         * @param topicName: topic name, default_channel_kafka_zzh_demo
         * @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和)
         */
    public long getMsgInCountPerSec(String topicName){
        String objectName = getTopicName(topicName);
        Object val = getAttribute(objectName,"Count");
        String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;
        if(val !=null){
            log.info("{}, Count:{}",debugInfo,(long)val);
            return (long)val;
        }
        return 0;
    }
    
        /**
         * @param topicName: topic name, default_channel_kafka_zzh_demo
         * @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。
         */
        public double getMsgInTpsPerSec(String topicName){
            String objectName = getTopicName(topicName);
            Object val = getAttribute(objectName,"OneMinuteRate");
            if(val !=null){
                double dVal = ((Double)val).doubleValue();
                return dVal;
            }
            return 0;
        }
    
        private Object getAttribute(String objName, String objAttr)
        {
            ObjectName objectName =null;
            try {
                objectName = new ObjectName(objName);
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
                return null;
            }
            return getAttribute(objectName,objAttr);
        }
    
        private Object getAttribute(ObjectName objName, String objAttr){
            if(conn== null)
            {
                log.error("jmx connection is null");
                return null;
            }
    
            try {
                return conn.getAttribute(objName,objAttr);
            } catch (MBeanException e) {
                e.printStackTrace();
                return null;
            } catch (AttributeNotFoundException e) {
                e.printStackTrace();
                return null;
            } catch (InstanceNotFoundException e) {
                e.printStackTrace();
                return null;
            } catch (ReflectionException e) {
                e.printStackTrace();
                return null;
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        }
    
        /**
         * @param topicName
         * @return 获取topicName中每个partition所对应的logSize(即offset)
         */
        public Map<Integer,Long> getTopicEndOffset(String topicName){
            Set<ObjectName> objs = getEndOffsetObjects(topicName);
            if(objs == null){
                return null;
            }
            Map<Integer, Long> map = new HashMap<>();
            for(ObjectName objName:objs){
                int partId = getParId(objName);
                Object val = getAttribute(objName,"Value");
                if(val !=null){
                    map.put(partId,(Long)val);
                }
            }
            return map;
        }
    
        private int getParId(ObjectName objName){
            if(newKafkaVersion){
                String s = objName.getKeyProperty("partition");
                return Integer.parseInt(s);
            }else {
                String s = objName.getKeyProperty("name");
    
                int to = s.lastIndexOf("-LogEndOffset");
                String s1 = s.substring(0, to);
                int from = s1.lastIndexOf("-") + 1;
    
                String ss = s.substring(from, to);
                return Integer.parseInt(ss);
            }
        }
    
        private Set<ObjectName> getEndOffsetObjects(String topicName){
            String objectName;
            if (newKafkaVersion) {
                objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";
            }else{
                objectName = ""kafka.log":type="Log",name="" + topicName + "-*-LogEndOffset"";
            }
            ObjectName objName = null;
            Set<ObjectName> objectNames = null;
            try {
                objName = new ObjectName(objectName);
                objectNames = conn.queryNames(objName,null);
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
                return  null;
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
    
            return objectNames;
        }
    }

    注意代码中对于两种不同kafka版本的区别处理。对应前面所说的三个检测项的方法为:

    public Map<Integer,Long> getTopicEndOffset(String topicName)
    public long getMsgInCountPerSec(String topicName)
    public double getMsgInTpsPerSec(String topicName)
    

    对于整个集群的处理需要另外一个类来保证,总体上是对集群中的每一个broker相应的值进行累加.

    package kafka.jmx;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Created by hidden on 2016/12/8.
     */
    public class JmxMgr {
        private static Logger log = LoggerFactory.getLogger(JmxMgr.class);
        private static List<JmxConnection> conns = new ArrayList<>();
    
        public static boolean init(List<String> ipPortList, boolean newKafkaVersion){
            for(String ipPort:ipPortList){
                log.info("init jmxConnection [{}]",ipPort);
                JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);
                boolean bRet = conn.init();
                if(!bRet){
                    log.error("init jmxConnection error");
                    return false;
                }
                conns.add(conn);
            }
            return true;
        }
    
        public static long getMsgInCountPerSec(String topicName){
            long val = 0;
            for(JmxConnection conn:conns){
                long temp = conn.getMsgInCountPerSec(topicName);
                val += temp;
            }
            return val;
        }
    
        public static double getMsgInTpsPerSec(String topicName){
            double val = 0;
            for(JmxConnection conn:conns){
                double temp = conn.getMsgInTpsPerSec(topicName);
                val += temp;
            }
            return val;
        }
    
        public static Map<Integer, Long> getEndOffset(String topicName){
            Map<Integer,Long> map = new HashMap<>();
            for(JmxConnection conn:conns){
                Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);
                if(tmp == null){
                    log.warn("get topic endoffset return null, topic {}", topicName);
                    continue;
                }
                for(Integer parId:tmp.keySet()){//change if bigger
                    if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){
                        map.put(parId, tmp.get(parId));
                    }
                }
            }
            return map;
        }
    
        public static void main(String[] args) {
            List<String> ipPortList = new ArrayList<>();
            ipPortList.add("xx.101.130.1:9999");
            ipPortList.add("xx.101.130.2:9999");
            JmxMgr.init(ipPortList,true);
    
            String topicName = "default_channel_kafka_zzh_demo";
            System.out.println(getMsgInCountPerSec(topicName));
            System.out.println(getMsgInTpsPerSec(topicName));
            System.out.println(getEndOffset(topicName));
        }
    }
    

    结果:

    2016-12-08 19:25:32 -[INFO] - [init jmxConnection [xx.101.130.1:9999]] - [kafka.jmx.JmxMgr:20]
    2016-12-08 19:25:32 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35]
    2016-12-08 19:25:33 -[INFO] - [init jmxConnection [xx.101.130.2:9999]] - [kafka.jmx.JmxMgr:20]
    2016-12-08 19:25:33 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35]
    2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:6000] - [kafka.jmx.JmxConnection:73]
    2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:4384] - [kafka.jmx.JmxConnection:73]
    10384
    3.915592283987704E-65
    {0=2072, 1=2084, 2=2073, 3=2083, 4=2072}

    三、kafka Manager

  • 相关阅读:
    java正则表达式学习
    使用TypeScript,AngularJs和Web API构建基本的CRUD Web 应用
    漫话CLR ---- 常量与字段
    漫话CLR ---- 属性
    漫话CLR ---- 定制Attribute
    分享一个线程安全的加载窗体
    漫话CLR ---- 类型基础
    漫话CLR ---- 委托
    js 数组遍历for..in弊端
    使用call来实现继承
  • 原文地址:https://www.cnblogs.com/wangleBlogs/p/9759099.html
Copyright © 2011-2022 走看看