zoukankan      html  css  js  c++  java
  • 【Dubbo】Monitor层实现简述

    一. 概述

    1. 版本:2.7.8

    2. 说明

    Monitor 监控层
        
        用来统计RPC 调用次数和调用耗时时间,扩展接口为MonitorFactory,对应的实现类为DubboMonitorFactroy。
        
        用户可以实现该层的MonitorFactory扩展接口,实现自定义监控统计策略。
    

    二. MonitorFactory工厂实现

    1. 结构图

    在这里插入图片描述


    2. MonitorFactory接口

    1. Monitor工厂接口
    2. 默认实现为DubboMonitorFactory
    @SPI("dubbo")
    public interface MonitorFactory {
    
        /**
         * Create monitor.
         *
         * @param url
         * @return monitor
         */
        @Adaptive("protocol")
        Monitor getMonitor(URL url);
    
    }
    

    3. AbstractMonitorFactory抽象类

    1. 通过加锁的方式创建监控对象(DubboMonitor)
    2. 每个URL作为一个监控单元
    3. getMonitor()方法获取Monitor对象:如存在,则直接返回
    4. getMonitor()方法获取Monitor对象:如不存在,则加锁初始化并缓存在静态变量中
    public abstract class AbstractMonitorFactory implements MonitorFactory {
    
        private static final Logger logger = LoggerFactory.getLogger(AbstractMonitorFactory.class);
    
        /**
         * The lock for getting monitor center
         */
        private static final ReentrantLock LOCK = new ReentrantLock();
    
        /**
         * The monitor centers Map<RegistryAddress, Registry>
         */
        private static final Map<String, Monitor> MONITORS = new ConcurrentHashMap<>();
    
        private static final Map<String, CompletableFuture<Monitor>> FUTURES = new ConcurrentHashMap<>();
    
        /**
         * The monitor create executor
         */
        private static final ExecutorService EXECUTOR = 
        new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 
            new NamedThreadFactory("DubboMonitorCreator", true));
    
        public static Collection<Monitor> getMonitors() {
            return Collections.unmodifiableCollection(MONITORS.values());
        }
    
        @Override
        public Monitor getMonitor(URL url) {
    
            url = url.setPath(MonitorService.class.getName()).addParameter(INTERFACE_KEY, MonitorService.class.getName());
            String key = url.toServiceStringWithoutResolving();
            Monitor monitor = MONITORS.get(key);
            Future<Monitor> future = FUTURES.get(key);
            if (monitor != null || future != null) {
                return monitor;
            }
    
            LOCK.lock();
    
            try {
    
                monitor = MONITORS.get(key);
                future = FUTURES.get(key);
                if (monitor != null || future != null) {
                    return monitor;
                }
    
                final URL monitorUrl = url;
                final CompletableFuture<Monitor> completableFuture 
                = CompletableFuture.supplyAsync(
                () -> AbstractMonitorFactory.this.createMonitor(monitorUrl));
                FUTURES.put(key, completableFuture);
                completableFuture.thenRunAsync(new MonitorListener(key), EXECUTOR);
    
                return null;
            } finally {
                // unlock
                LOCK.unlock();
            }
        }
    
        // 抽象方法,由DubboMonitorFactory子类实现。主要是创建DubboMonitor对象
        protected abstract Monitor createMonitor(URL url);
    
    
        class MonitorListener implements Runnable {
    
            private String key;
    
            public MonitorListener(String key) {
                this.key = key;
            }
    
            @Override
            public void run() {
            
                try {
                
                    CompletableFuture<Monitor> completableFuture = AbstractMonitorFactory.FUTURES.get(key);
                    AbstractMonitorFactory.MONITORS.put(key, completableFuture.get());
                    AbstractMonitorFactory.FUTURES.remove(key);
                    
                } catch (InterruptedException e) {
                
                    logger.warn("Thread was interrupted unexpectedly, monitor will never be got.");
                    AbstractMonitorFactory.FUTURES.remove(key);
                    
                } catch (ExecutionException e) {
                
                    logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e);
                }
            }
        }
    
    }
    
    

    4. DubboMonitorFactory实现类

    1. createMonitor方法创建DubboMonitor对象,该对象构造时创建定时器周期统计数据
    public class DubboMonitorFactory extends AbstractMonitorFactory {
    
        private Protocol protocol;
    
        private ProxyFactory proxyFactory;
    
        public void setProtocol(Protocol protocol) {
            this.protocol = protocol;
        }
    
        public void setProxyFactory(ProxyFactory proxyFactory) {
            this.proxyFactory = proxyFactory;
        }
    
        @Override
        protected Monitor createMonitor(URL url) {
    
            URLBuilder urlBuilder = URLBuilder.from(url);
            urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
            if (StringUtils.isEmpty(url.getPath())) {
                urlBuilder.setPath(MonitorService.class.getName());
            }
            String filter = url.getParameter(REFERENCE_FILTER_KEY);
            if (StringUtils.isEmpty(filter)) {
                filter = "";
            } else {
                filter = filter + ",";
            }
            
            urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
                    REFERENCE_FILTER_KEY, filter + "-monitor");
            
            Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
            MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
            
            return new DubboMonitor(monitorInvoker, monitorService);
        }
    
    }
    

    三. Monitor实现

    1. 结构图

    在这里插入图片描述


    2. MonitorService接口

    1. collect方法:用于统计监控信息
    2. lookup方法:查看监控数据
    public interface MonitorService {
    
        String APPLICATION = "application";
    
        String INTERFACE = "interface";
    
        String METHOD = "method";
    
        String GROUP = "group";
    
        String VERSION = "version";
    
        String CONSUMER = "consumer";
    
        String PROVIDER = "provider";
    
        String TIMESTAMP = "timestamp";
    
        String SUCCESS = "success";
    
        String FAILURE = "failure";
    
        String INPUT = INPUT_KEY;
    
        String OUTPUT = OUTPUT_KEY;
    
        String ELAPSED = "elapsed";
    
        String CONCURRENT = "concurrent";
    
        String MAX_INPUT = "max.input";
    
        String MAX_OUTPUT = "max.output";
    
        String MAX_ELAPSED = "max.elapsed";
    
        String MAX_CONCURRENT = "max.concurrent";
    
        /**
         * Collect monitor data
         * 1. support invocation count: count://host/interface?application=foo&method=foo&provider=10.20.153.11:20880&success=12&failure=2&elapsed=135423423
         * 1.1 host,application,interface,group,version,method: record source host/application/interface/method
         * 1.2 add provider address parameter if it's data sent from consumer, otherwise, add source consumer's address in parameters
         * 1.3 success,failure,elapsed: record success count, failure count, and total cost for success invocations, average cost (total cost/success calls)
         *
         * @param statistics
         */
        void collect(URL statistics);
    
        /**
         * Lookup monitor data
         * 1. support lookup by day: count://host/interface?application=foo&method=foo&side=provider&view=chart&date=2012-07-03
         * 1.1 host,application,interface,group,version,method: query criteria for looking up by host, application, interface, method. When one criterion is not present, it means ALL will be accepted, but 0.0.0.0 is ALL for host
         * 1.2 side=consumer,provider: decide the data from which side, both provider and consumer are returned by default
         * 1.3 default value is view=summary, to return the summarized data for the whole day. view=chart will return the URL address showing the whole day trend which is convenient for embedding in other web page
         * 1.4 date=2012-07-03: specify the date to collect the data, today is the default value
         *
         * @param query
         * @return statistics
         */
        List<URL> lookup(URL query);
    
    }
    

    3. Monitor接口

    public interface Monitor extends Node, MonitorService {
    
    }
    

    4. DubboMonitor实现类

    1. 数据从URL收集
    2. 构造函数创建定时任务(scheduledExecutorService.scheduleWithFixedDelay)
    3. 定时任务周期调用send()方法统计数据
    public class DubboMonitor implements Monitor {
    
        private static final Logger logger = LoggerFactory.getLogger(DubboMonitor.class);
    
        /**
         * The length of the array which is a container of the statistics
         */
        private static final int LENGTH = 10;
    
        /**
         * The timer for sending statistics
         */
        private final ScheduledExecutorService scheduledExecutorService
                = Executors.newScheduledThreadPool(3, new NamedThreadFactory("DubboMonitorSendTimer", true));
    
        /**
         * The future that can cancel the <b>scheduledExecutorService</b>
         */
        private final ScheduledFuture<?> sendFuture;
    
        private final Invoker<MonitorService> monitorInvoker;
    
        private final MonitorService monitorService;
    
        private final ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap = new ConcurrentHashMap<Statistics, AtomicReference<long[]>>();
    
        public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
        
            this.monitorInvoker = monitorInvoker;
            this.monitorService = monitorService;
            // The time interval for timer <b>scheduledExecutorService</b> to send data
            final long monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
            // collect timer for collecting statistics data
            sendFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
                try {
                    // collect data
                    send();
                } catch (Throwable t) {
                    logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
                }
            }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
        }
    
        public void send() {
    
            if (logger.isDebugEnabled()) {
                logger.debug("Send statistics to monitor " + getUrl());
            }
    
            String timestamp = String.valueOf(System.currentTimeMillis());
            for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
    
                // get statistics data
                Statistics statistics = entry.getKey();
                AtomicReference<long[]> reference = entry.getValue();
                long[] numbers = reference.get();
                long success = numbers[0];
                long failure = numbers[1];
                long input = numbers[2];
                long output = numbers[3];
                long elapsed = numbers[4];
                long concurrent = numbers[5];
                long maxInput = numbers[6];
                long maxOutput = numbers[7];
                long maxElapsed = numbers[8];
                long maxConcurrent = numbers[9];
                String protocol = getUrl().getParameter(DEFAULT_PROTOCOL);
    
                // send statistics data
                URL url = statistics.getUrl()
                        .addParameters(MonitorService.TIMESTAMP, timestamp,
                                MonitorService.SUCCESS, String.valueOf(success),
                                MonitorService.FAILURE, String.valueOf(failure),
                                MonitorService.INPUT, String.valueOf(input),
                                MonitorService.OUTPUT, String.valueOf(output),
                                MonitorService.ELAPSED, String.valueOf(elapsed),
                                MonitorService.CONCURRENT, String.valueOf(concurrent),
                                MonitorService.MAX_INPUT, String.valueOf(maxInput),
                                MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
                                MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
                                MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent),
                                DEFAULT_PROTOCOL, protocol
                        );
                monitorService.collect(url);
    
                // reset
                long[] current;
                long[] update = new long[LENGTH];
                do {
                    current = reference.get();
                    if (current == null) {
                        update[0] = 0;
                        update[1] = 0;
                        update[2] = 0;
                        update[3] = 0;
                        update[4] = 0;
                        update[5] = 0;
                    } else {
                        update[0] = current[0] - success;
                        update[1] = current[1] - failure;
                        update[2] = current[2] - input;
                        update[3] = current[3] - output;
                        update[4] = current[4] - elapsed;
                        update[5] = current[5] - concurrent;
                    }
                } while (!reference.compareAndSet(current, update));
            }
        }
    
        @Override
        public void collect(URL url) {
    
            // data to collect from url
            int success = url.getParameter(MonitorService.SUCCESS, 0);
            int failure = url.getParameter(MonitorService.FAILURE, 0);
            int input = url.getParameter(MonitorService.INPUT, 0);
            int output = url.getParameter(MonitorService.OUTPUT, 0);
            int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
            int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
    
            // init atomic reference
            Statistics statistics = new Statistics(url);
            AtomicReference<long[]> reference = statisticsMap.computeIfAbsent(statistics, k -> new AtomicReference<>());
    
            // use CompareAndSet to sum
            long[] current;
            long[] update = new long[LENGTH];
            do {
                current = reference.get();
                if (current == null) {
    
                    update[0] = success;
                    update[1] = failure;
                    update[2] = input;
                    update[3] = output;
                    update[4] = elapsed;
                    update[5] = concurrent;
                    update[6] = input;
                    update[7] = output;
                    update[8] = elapsed;
                    update[9] = concurrent;
                } else {
    
                    update[0] = current[0] + success;
                    update[1] = current[1] + failure;
                    update[2] = current[2] + input;
                    update[3] = current[3] + output;
                    update[4] = current[4] + elapsed;
                    update[5] = (current[5] + concurrent) / 2;
                    update[6] = current[6] > input ? current[6] : input;
                    update[7] = current[7] > output ? current[7] : output;
                    update[8] = current[8] > elapsed ? current[8] : elapsed;
                    update[9] = current[9] > concurrent ? current[9] : concurrent;
                }
            } while (!reference.compareAndSet(current, update));
        }
    
        @Override
        public List<URL> lookup(URL query) {
            return monitorService.lookup(query);
        }
    
        @Override
        public URL getUrl() {
            return monitorInvoker.getUrl();
        }
    
        @Override
        public boolean isAvailable() {
            return monitorInvoker.isAvailable();
        }
    
        @Override
        public void destroy() {
            try {
                ExecutorUtil.cancelScheduledFuture(sendFuture);
            } catch (Throwable t) {
                logger.error("Unexpected error occur at cancel sender timer, cause: " + t.getMessage(), t);
            }
            monitorInvoker.destroy();
        }
    
    }
    

    四. 过滤器统计监控数据

    1. 结构图

    在这里插入图片描述


    2. Filter接口

    1. invoke()方法 :执行过滤逻辑
    2. onResponse()方法 :正常执行后执行相关逻辑
    3. onError()方法 :发生异常后执行相关逻辑
    
    @SPI
    public interface Filter {
    
        /**
         * Make sure call invoker.invoke() in your implementation.
         */
        Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
    
        interface Listener {
    
            void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
    
            void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
        }
    
    }
    
    

    3. MonitorFilter实现类

    1. invoke()方法:并发计数
    2. monitor.collect(statisticsURL):正常或异常执行后调用DubboMonitor.collect()收集数据
    
    @Activate(group = {PROVIDER, CONSUMER})
    public class MonitorFilter implements Filter, Filter.Listener {
    
        private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
        private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
    
        /**
         * The Concurrent counter
         */
        private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
    
        /**
         * The MonitorFactory
         */
        private MonitorFactory monitorFactory;
    
        public void setMonitorFactory(MonitorFactory monitorFactory) {
            this.monitorFactory = monitorFactory;
        }
    
    
        /**
         * The invocation interceptor,it will collect the invoke data about this invocation and send it to monitor center
         *
         * @param invoker    service
         * @param invocation invocation.
         * @return {@link Result} the invoke result
         * @throws RpcException
         */
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
                getConcurrent(invoker, invocation).incrementAndGet(); // count up
            }
            return invoker.invoke(invocation); // proceed invocation chain
        }
    
        // concurrent counter
        private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
            String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
            return concurrents.computeIfAbsent(key, k -> new AtomicInteger());
        }
    
        @Override
        public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), false);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
    
        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
    
        /**
         * The collector logic, it will be handled by the default monitor
         *
         * @param invoker
         * @param invocation
         * @param result     the invoke result
         * @param remoteHost the remote host address
         * @param start      the timestamp the invoke begin
         * @param error      if there is an error on the invoke
         */
        private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            try {
                URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
                Monitor monitor = monitorFactory.getMonitor(monitorUrl);
                if (monitor == null) {
                    return;
                }
                URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
                monitor.collect(statisticsURL);
            } catch (Throwable t) {
                logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    
        /**
         * Create statistics url
         *
         * @param invoker
         * @param invocation
         * @param result
         * @param remoteHost
         * @param start
         * @param error
         * @return
         */
        private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        
            // ---- service statistics ----
            long elapsed = System.currentTimeMillis() - start; // invocation cost
            int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
            String application = invoker.getUrl().getParameter(APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // service name
            String method = RpcUtils.getMethodName(invocation); // method name
            String group = invoker.getUrl().getParameter(GROUP_KEY);
            String version = invoker.getUrl().getParameter(VERSION_KEY);
    
            int localPort;
            String remoteKey, remoteValue;
            
            if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
                // ---- for service consumer ----
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- for service provider ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = remoteHost;
            }
            
            String input = "", output = "";
            if (invocation.getAttachment(INPUT_KEY) != null) {
                input = invocation.getAttachment(INPUT_KEY);
            }
            if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
                output = result.getAttachment(OUTPUT_KEY);
            }
    
            return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR +
            method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service,
            MonitorService.METHOD, method, remoteKey, remoteValue, 
            error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED,
            String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), 
            INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
        }
    
    }
    
  • 相关阅读:
    Azure Functions(一)什么是 ServerLess
    Azure Terraform(八)利用Azure DevOps 实现Infra资源和.NET CORE Web 应用程序的持续集成、持续部署
    Azure Terraform(六)Common Module
    Azure Terraform(五)利用Azure DevOps 实现自动化部署基础资源
    Azure Terraform(四)状态文件存储
    Java | zuul 1.x 是如何实现请求转发的
    Go | Go 结合 Consul 实现动态反向代理
    Java | 在 Java 中执行动态表达式语句: 前中后缀、Ognl、SpEL、Groovy、Jexl3
    宝,我今天CR了,C的什么R? 走过场的CR
    被监控轰炸了,不得不使出绝招
  • 原文地址:https://www.cnblogs.com/gossip/p/14422196.html
Copyright © 2011-2022 走看看