zoukankan      html  css  js  c++  java
  • turbine是怎么收集指标数据的

    turbine是怎么收集指标数据的

    我们通过spring cloud图形化dashboard是如何实现指标的收集展示的知道了,图形化的指标是从turbine获取到指标数据的。那么turbine的数据是从哪里来的呢?

    1、数据来源

    我们通过url:http://localhost:10000/turbine.stream?cluster=default可以获取到指标的json数据。那么指标数据又是从何处获取到的。
    答案是:从各个服务的/manage/hystrix.stream端点获取的

    2、turbine架构设计

    turbine官方的github地址:
    https://github.com/Netflix/turbine/wiki
    可以找到turbine的架构设计
    image_1c8cnao7u1rt5e2p1jhb1bj716b714.png-146.7kB

    详细信息参考
    https://github.com/Netflix/Turbine/wiki/Design-And-Architecture-(1.x)

    说明:turbine启动的时候,会去连接需要监控的主机,建立起监听,每一个实例会有一个监听。当实例监听从各个服务获取到数据的时候,会将数据填充到派发器dispatcher中,由派发器将数据输出到各个客户端。

    image_1c8cno6j71v56vab17dska015741h.png-121.6kB

    3、源码阅读

    turbine的实现在turbine核心包下
    com.netflix.turbine:turbine-core

    在该包下,可以找到几个关键的类
    InstanceMonitorHandlerQueueTupleTurbineDataDispatcherTurbineStreamServlet

    我们启动调试的时候,可以看到
    image_1c8co5shg1sqf16le151179ab6d1u.png-190.5kB

    实例的url其实是指向具体需要监控的实例的端点,即
    http://sheng:8088/manage/hystrix.stream
    查看这个链接我们可以看到
    image_1c8co9jkt1gnco5b108p1bu4gds2b.png-96.4kB

    InstanceMonitor启动监听

    public void startMonitor() throws Exception {
            // This is the only state that we allow startMonitor to proceed in
            if (monitorState.get() != State.NotStarted) {
                return;
            }
    
            taskFuture = ThreadPool.submit(new Callable<Void>() {
    
                @Override
                public Void call() throws Exception {
    
                    try {
                        //初始化,连接到具体实例上
                        init();
                        monitorState.set(State.Running);
                        while(monitorState.get() == State.Running) {
                            //关键代码
                            doWork();
                        }
                    } catch(Throwable t) {
                        logger.warn("Stopping InstanceMonitor for: " + getStatsInstance().getHostname() + " " + getStatsInstance().getCluster(), t);
                    } finally {
                        if (monitorState.get() == State.Running) {
                            monitorState.set(State.StopRequested);
                        }
                        cleanup();
                        monitorState.set(State.CleanedUp);
                    }
                    return null;
                }
            });
        }
    

        private void init() throws Exception {
    
            HttpGet httpget = new HttpGet(url);
    
            HttpResponse response = gatewayHttpClient.getHttpClient().execute(httpget);
    
            HttpEntity entity = response.getEntity();
            InputStream is = entity.getContent();
            //初始化一个输入流
            reader = new BufferedReader(new InputStreamReader(is));
    
            int statusCode = response.getStatusLine().getStatusCode();
            if (statusCode != 200) {
                // this is unexpected. We probably have the wrong endpoint. Print the response out for debugging and give up here.
                List<String> responseMessage = IOUtils.readLines(reader);
                logger.error("Could not initiate connection to host, giving up: " + responseMessage);
                throw new MisconfiguredHostException(responseMessage.toString());
            }
        }
    

    dowork()方法做了什么呢

        private void doWork() throws Exception {
    
            DataFromSingleInstance instanceData = null;
    
            //获取实例数据
            instanceData = getNextStatsData();
            if(instanceData == null) {
                return;
            } else {
                lastEventUpdateTime.set(System.currentTimeMillis());
            }
    
            List<DataFromSingleInstance> list = new ArrayList<DataFromSingleInstance>();
            list.add(instanceData);
    
            /* send to all handlers */
            //将获取到的数据添加到dispatcher中
            boolean continueRunning = dispatcher.pushData(getStatsInstance(), list);
            if(!continueRunning) {
                logger.info("No more listeners to the host monitor, stopping monitor for: " + host.getHostname() + " " + host.getCluster());
                monitorState.set(State.StopRequested);
                return;
            }
        }
    

    getNextStatsData读取数据
    image_1c8cor2ih1cb714tc1b5m8mk7ks2o.png-58kB


    那么派发器是什么呢,它的实现查看TurbineDataDispatcher
    查看它的pushData方法
    发现调用的是tuple.pushData(statsData);tuple其实就像一个管道,查看HandlerQueueTuplepushData方法

        public void pushData(K data) {
            if (stopped) {
                return;
            }
            boolean success = queue.writeEvent(data);
            if (isCritical()) {
                // track stats
                if (success) {
                    counter.increment(Type.EVENT_PROCESSED);
                } else {
                    counter.increment(Type.EVENT_DISCARDED);
                }
            }
        }
    

    看到queue.writeEvent(data)、往队列里写数据
    这个队列又是什么呢?
    其实就是一个事件队列EventQueue,查看它的写事件方法

      public boolean writeEvent(T event) {
            if (count.get() > maxCapacity) {  // approx check for capacity
                return false;
            }
            count.incrementAndGet();
            queue.add(event);
            return true;
        }
    

    如果队列中的长度大于maxCapacity,将不会再往队列里填充数据。


    当客户端连接上的时候,queue就会被消费。如果客户端没有连接上的时候,queue读出来,经过一系列的操作会写回queue中,直到队列满了就不在写了。

    1、当没有客户端连接上的时候
    image_1c8evuokc1u8scad10cuef11jm413.png-119.7kB

    eventHandler经过一些列的处理,数据会被写回到queue中

    2、当有客户端连上的时候,假设我们通过浏览器地址栏输入了
    http://localhost:10000/turbine.stream?cluster=default
    此时
    我们看到eventHandler为TurbineStreamingConnection,见下图
    image_1c8evlp0c1n4p491rcebcn8htm.png-159.3kB

    handlData()就变成了TurbineStreamingConnection中的方法

        public void handleData(Collection<T> data) {
            
            if (stopMonitoring) {
                // we have been stopped so don't try handling data
                return;
            }
            //将数据写到steamHandler中
            writeToStream(data);
        }
    

    writeToStream()中有个关键的操作streamHandler.writeData(jsonStringForDataHash)
    image_1c8f022frk5f198mk0k3g31rah1g.png-90.2kB

    writeData()方法就可以将数据写到response中
    image_1c8f049sgvmpaq79b813ko1nsp1t.png-85kB

    客户端访问http://localhost:10000/turbine.stream?cluster=default的时候,其实就是通过TurbineStreamServlet获取到响应结果的。

  • 相关阅读:
    MySQL之PXC集群
    MySQL大表查询未走索引异常分析
    Redis场景应用之UA池
    MySQL日志剖析
    Redis场景应用之排行榜
    MySQL B+树索引解析
    Redisson分布式锁剖析
    关键字替代符号C++
    并查集按秩合并
    POJ3048
  • 原文地址:https://www.cnblogs.com/liangzs/p/8550275.html
Copyright © 2011-2022 走看看