zoukankan      html  css  js  c++  java
  • Skywalking-11:Skywalking查询协议——案例分析

    以查询 Metrics 信息案例来分析 Skywalking 查询协议

    基本概述

    Skywalking 查询协议默认基于 GraphQL ,如果有需要也可以自定义扩展,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查询模块即可。

    截取 Skywalking UI 发送的请求

    • 请求路径
    POST http://127.0.0.1:8080/graphql
    
    • 请求体
    {
      "query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {
      readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {
        label
        values {
          values {value}
        }
      }}",
      "variables": {
        "duration": {
          "start": "2021-07-03 1320",
          "end": "2021-07-03 1321",
          "step": "MINUTE"
        },
        "condition": {
          "name": "instance_jvm_thread_runnable_thread_count",
          "entity": {
            "scope": "ServiceInstance",
            "serviceName": "business-zone::projectA",
            "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
            "normal": true
          }
        }
      }
    }
    
    • 响应
    {
      "data": {
        "readMetricsValues": {
          "values": {
            "values": [
              {
                "value": 22
              },
              {
                "value": 22
              }
            ]
          }
        }
      }
    }
    

    Skywalking 源码中找到对应 GraphQL 定义

    打开 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol 目录,使用请求体中的模板关键字 readMetricsValues 搜索
    oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls 中找到对应的定义

    extend type Query {
        # etc...
        # Read time-series values in the duration of required metrics
        readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues!
        # etc...
    }
    

    输入参数定义

    input MetricsCondition {
        # Metrics name, which should be defined in OAL script
        # Such as:
        # Endpoint_avg = from(Endpoint.latency).avg()
        # Then, `Endpoint_avg`
        name: String!
        # Follow entity definition description.
        entity: Entity!
    }
    
    input Entity {
        # 1. scope=All, no name is required.
        # 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName
        # 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation
        #    serviceName/serviceInstanceName/endpointName is/are the source(s)
        #    destServiceName/destServiceInstanceName/destEndpointName is/are destination(s)
        #    set necessary names of sources and destinations.
        scope: Scope!
        serviceName: String
        # Normal service is the service having installed agent or metrics reported directly.
        # Unnormal service is conjectural service, usually detected by the agent.
        normal: Boolean
        serviceInstanceName: String
        endpointName: String
        destServiceName: String
        # Normal service is the service having installed agent or metrics reported directly.
        # Unnormal service is conjectural service, usually detected by the agent.
        destNormal: Boolean
        destServiceInstanceName: String
        destEndpointName: String
    }
    
    # The Duration defines the start and end time for each query operation.
    # Fields: `start` and `end`
    #   represents the time span. And each of them matches the step.
    #   ref https://www.ietf.org/rfc/rfc3339.txt
    #   The time formats are
    #       `SECOND` step: yyyy-MM-dd HHmmss
    #       `MINUTE` step: yyyy-MM-dd HHmm
    #       `HOUR` step: yyyy-MM-dd HH
    #       `DAY` step: yyyy-MM-dd
    #       `MONTH` step: yyyy-MM
    # Field: `step`
    #   represents the accurate time point.
    # e.g.
    #   if step==HOUR , start=2017-11-08 09, end=2017-11-08 19
    #   then
    #       metrics from the following time points expected
    #       2017-11-08 9:00 -> 2017-11-08 19:00
    #       there are 11 time points (hours) in the time span.
    input Duration {
        start: String!
        end: String!
        step: Step!
    }
    
    enum Step {
        DAY
        HOUR
        MINUTE
        SECOND
    }
    

    返回结果定义

    type MetricsValues {
        # Could be null if no label assigned in the query condition
        label: String
        # Values of this label value.
        values: IntValues
    }
    
    type IntValues {
        values: [KVInt!]!
    }
    
    type KVInt {
        id: ID!
        # This is the value, the caller must understand the Unit.
        # Such as:
        # 1. If ask for cpm metric, the unit and result should be count.
        # 2. If ask for response time (p99 or avg), the unit should be millisecond.
        value: Long!
    }
    

    使用 GraphQL IDEA 插件验证 Skywalking UI 的请求

    使用“ GraphQL 在 Skywalking 中的应用”一节中的方式,模仿“截取 Skywalking UI 发送的请求”一节中前端发送的请求

    • 请求模板
    query queryData($condition: MetricsCondition!, $duration: Duration!) {
        readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) {
            label values { values { id value }}
        }
    }
    
    • 请求参数
    {
      "duration": {
        "start": "2021-07-03 1400",
        "end": "2021-07-03 1401", 
        "step": "MINUTE"
      },
      "condition": {
        "name": "instance_jvm_thread_runnable_thread_count",
        "entity": {
          "scope": "ServiceInstance",
          "serviceName": "business-zone::projectA",
          "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
          "normal": true
        }
      }
    }
    
    • 响应结果
    {
      "data": {
        "readMetricsValues": {
          "values": {
            "values": [
              {
                "id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
                "value": 22
              },
              {
                "id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
                "value": 22
              }
            ]
          }
        }
      }
    }
    

    PS:如果不使用模板的方式,写查询语句是会有代码提示的

    query queryData {
        readMetricsValues(
            duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE},
            condition: {
                name: "instance_jvm_thread_runnable_thread_count",
                entity: {
                    scope: ServiceInstance,
                    serviceName: "business-zone::projectA",
                    serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
                    normal: true
                }
            }
        ) {
            label values{ values{ id value }}
        }
    }
    

    如何将 GraphQL Schema 文件加载到程序中

    搜索 metrics-v2.graphqls ,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java 找到加载代码

        // 初始化GraphQL引擎
        @Override
        public void prepare() throws ServiceNotProvidedException, ModuleStartException {
            GraphQLSchema schema = SchemaParser.newParser()
                                               // etc...
                                               .file("query-protocol/metrics-v2.graphqls")
                                               .resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口实现类
                                               // etc...
                                               .build()
                                               .makeExecutableSchema();
            this.graphQL = GraphQL.newGraphQL(schema).build();
        }
    

    org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery 类中,找到 readMetricsValues 方法

        /**
         * Read time-series values in the duration of required metrics
         */
        public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
            if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
                final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
                MetricsValues values = new MetricsValues();
                pointOfTimes.forEach(pointOfTime -> {
                    String id = pointOfTime.id(
                        condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY"
                    );
                    final KVInt kvInt = new KVInt();
                    kvInt.setId(id);
                    kvInt.setValue(0);
                    values.getValues().addKVInt(kvInt);
                });
                return values;
            }
            return getMetricsQueryService().readMetricsValues(condition, duration);
        }
    
        private MetricsQueryService getMetricsQueryService() {
            if (metricsQueryService == null) {
                this.metricsQueryService = moduleManager.find(CoreModule.NAME)
                                                        .provider()
                                                        .getService(MetricsQueryService.class);
            }
            return metricsQueryService;
        }
    

    org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues

        /**
         * Read time-series values in the duration of required metrics
         */
        public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
            return getMetricQueryDAO().readMetricsValues(
                condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
        }
    
        private IMetricsQueryDAO getMetricQueryDAO() {
            if (metricQueryDAO == null) {
                metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class);
            }
            return metricQueryDAO;
        }
    

    查看Extend storage文档, IMetricsQueryDAO 为指标查询数据访问对象

    # Implement all DAOs
    # Here is the list of all DAO interfaces in storage
    IServiceInventoryCacheDAO
    IServiceInstanceInventoryCacheDAO
    IEndpointInventoryCacheDAO
    INetworkAddressInventoryCacheDAO
    IBatchDAO
    StorageDAO
    IRegisterLockDAO
    ITopologyQueryDAO
    IMetricsQueryDAO
    ITraceQueryDAO
    IMetadataQueryDAO
    IAggregationQueryDAO
    IAlarmQueryDAO
    IHistoryDeleteDAO
    IMetricsDAO
    IRecordDAO
    IRegisterDAO
    ILogQueryDAO
    ITopNRecordsQueryDAO
    IBrowserLogQueryDAO
    

    通过类图,可以看出 IMetricsQueryDAO 实现类有 ES 、 ES7 、 InfluxDB 、 SQL 四种

    如何将 GraphQL 引擎注册到 Jetty 服务

        // 注册GraphQL查询处理器至Jetty服务
        @Override
        public void start() throws ServiceNotProvidedException, ModuleStartException {
            JettyHandlerRegister service = getManager().find(CoreModule.NAME)
                                                       .provider()
                                                       .getService(JettyHandlerRegister.class);
            service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));
        }
    

    通过分析 GraphQLQueryProvider 该类,发现就是 QueryModule (查询模块)的 Provider (提供)类

    由此,也验证了在“基本概述”一节的说法:

    Skywalking 查询协议默认基于 GraphQL ,如果有需要也可以自定义扩展,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查询模块即可。

        @Override
        public String name() {
            return "graphql";
        }
    
        @Override
        public Class<? extends ModuleDefine> module() {
            return QueryModule.class;
        }
    
    package org.apache.skywalking.oap.query.graphql;
    
    import com.google.gson.Gson;
    import com.google.gson.JsonArray;
    import com.google.gson.JsonElement;
    import com.google.gson.JsonObject;
    import com.google.gson.reflect.TypeToken;
    import graphql.ExecutionInput;
    import graphql.ExecutionResult;
    import graphql.GraphQL;
    import graphql.GraphQLError;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.lang.reflect.Type;
    import java.util.List;
    import java.util.Map;
    import javax.servlet.http.HttpServletRequest;
    import lombok.RequiredArgsConstructor;
    import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;
    import org.apache.skywalking.oap.server.library.util.CollectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @RequiredArgsConstructor
    public class GraphQLQueryHandler extends JettyJsonHandler {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class);
    
        private static final String QUERY = "query";
        private static final String VARIABLES = "variables";
        private static final String DATA = "data";
        private static final String ERRORS = "errors";
        private static final String MESSAGE = "message";
    
        private final Gson gson = new Gson();
        private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {
        }.getType();
    
        private final String path;
    
        private final GraphQL graphQL;
    
        @Override
        public String pathSpec() {
            return path;
        }
    
        @Override
        protected JsonElement doGet(HttpServletRequest req) {
            throw new UnsupportedOperationException("GraphQL only supports POST method");
        }
    
        @Override
        protected JsonElement doPost(HttpServletRequest req) throws IOException {
            BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
            String line;
            StringBuilder request = new StringBuilder();
            while ((line = reader.readLine()) != null) {
                request.append(line);
            }
    
            JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);
    
            return execute(requestJson.get(QUERY)
                                      .getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));
        }
    
        private JsonObject execute(String request, Map<String, Object> variables) {
            try {
                ExecutionInput executionInput = ExecutionInput.newExecutionInput()
                                                              .query(request)
                                                              .variables(variables)
                                                              .build();
                // 使用GraphQL引擎获取查询结果
                ExecutionResult executionResult = graphQL.execute(executionInput);
                LOGGER.debug("Execution result is {}", executionResult);
                // 封装返回结果
                Object data = executionResult.getData();
                List<GraphQLError> errors = executionResult.getErrors();
                JsonObject jsonObject = new JsonObject();
                if (data != null) {
                    jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class));
                }
    
                if (CollectionUtils.isNotEmpty(errors)) {
                    JsonArray errorArray = new JsonArray();
                    errors.forEach(error -> {
                        JsonObject errorJson = new JsonObject();
                        errorJson.addProperty(MESSAGE, error.getMessage());
                        errorArray.add(errorJson);
                    });
                    jsonObject.add(ERRORS, errorArray);
                }
                return jsonObject;
            } catch (final Throwable e) {
                LOGGER.error(e.getMessage(), e);
                JsonObject jsonObject = new JsonObject();
                JsonArray errorArray = new JsonArray();
                JsonObject errorJson = new JsonObject();
                errorJson.addProperty(MESSAGE, e.getMessage());
                errorArray.add(errorJson);
                jsonObject.add(ERRORS, errorArray);
                return jsonObject;
            }
        }
    }
    

    Webapp 网关转发 GraphQL 请求至 OAP

    v8.6.0 及之前,网关都是 zuul , v8.7.0 及之后替换成了 Spring Cloud Gateway 。因为这块不是这篇文章的重点,这里不再赘述

    总结

    Skywalking 的查询协议默认使用通用性很强的 GraphQL 实现,客户端可以通过 GraphQL 协议很方便的选取自己需要的数据。
    对应 Skywalking 这种模式相对固定、变更不频繁的查询需求来说,还是挺适合的。

    参考文档

  • 相关阅读:
    Ubuntu更新Hostname和hosts
    Github 的其他用法
    Git与Github使用
    【实习】实习期间一些工具的使用
    【实习】实习第一周周报
    QT_string转char*
    QT_获取正在运行程序的进程id(判断程序是否正在运行)
    QT_获取运行进程所在目录路径_2
    QT_获取运行进程所在目录路径_1
    QT_强杀进程
  • 原文地址:https://www.cnblogs.com/switchvov/p/15375443.html
Copyright © 2011-2022 走看看