zoukankan      html  css  js  c++  java
  • skywalking后端处理业务逻辑的梳理

     接下来会进入下面的代理处理TraceSegment

    接下来会调用到TraceAnalyzer的doAnalysis方法,代码如下

    public void doAnalysis(SegmentObject segmentObject) {
            if (segmentObject.getSpansList().size() == 0) {
                return;
            }
    
            createSpanListeners();
    
            try {
                notifySegmentListener(segmentObject);
    
                segmentObject.getSpansList().forEach(spanObject -> {
                    if (spanObject.getSpanId() == 0) {
                        notifyFirstListener(spanObject, segmentObject);
                    }
    
                    if (SpanType.Exit.equals(spanObject.getSpanType())) {
                        notifyExitListener(spanObject, segmentObject);
                    } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                        notifyEntryListener(spanObject, segmentObject);
                    } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                        notifyLocalListener(spanObject, segmentObject);
                    } else {
                        log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                                  .name());
                    }
                });
    
                notifyListenerToBuild();
            } catch (Throwable e) {
                log.error(e.getMessage(), e);
            }
        }

    我们首先来看方法的第一个业务逻辑createSpanListeners(),改方法主要产生处于TraceSegment。以及各类span的listener,默认情况下产生的listener情况如下

     默认情况下产生上面三个listener

    1、SegmentAnalysisListener主要是处理TraceSegment,将整个TraceSegment转行成segement对象存储到数据库中

    2、MultiScopesAnalysisListener主要用来处理TraceSegment中的span,依据span的不同类型进行单独处理,然后依据不同的span类型产生不同的指标信息

    这是收到的skywalking的出

    skywalking后端oap集群收到TraceSegment的处理信息之后,会进行处理,这里处理会对TraceSegment中保存的span进行分类,span分为下面的几种类型

    enum Point {
            Entry, Exit, Local, First, Segment
        }

    每一种类型会对应一个listen处理器与之对应,经过处理器处理之后,在延伸到后续的流程和操作

    五个listen的类型如下面所示

    代码中创建了三个listener情况如下

     接下来我们分析完成了createSpanListeners();接下来是执行

    notifySegmentListener(segmentObject);我们来看下改方法是如何进行处理的

     private void notifySegmentListener(SegmentObject segmentObject) {
            analysisListeners.forEach(listener -> {
                if (listener.containsPoint(AnalysisListener.Point.Segment)) {
                    ((SegmentListener) listener).parseSegment(segmentObject);
                }能够
            });
        }

    在上面创建的三个listener中只有SegmentAnalysisListener能够处理TraceSegment,我们来看下后面的代码

     @Override
        public void parseSegment(SegmentObject segmentObject) {
            segment.setTraceId(segmentObject.getTraceId());
            segmentObject.getSpansList().forEach(span -> {
                if (startTimestamp == 0 || startTimestamp > span.getStartTime()) {
                    startTimestamp = span.getStartTime();
                }
                if (span.getEndTime() > endTimestamp) {
                    endTimestamp = span.getEndTime();
                }
                isError = isError || segmentStatusAnalyzer.isError(span);
                appendSearchableTags(span);
            });
            final long accurateDuration = endTimestamp - startTimestamp;
            duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) accurateDuration;
    
            if (sampleStatus.equals(SAMPLE_STATUS.UNKNOWN) || sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
                if (sampler.shouldSample(segmentObject.getTraceId())) {
                    sampleStatus = SAMPLE_STATUS.SAMPLED;
                } else if (isError && forceSampleErrorSegment) {
                    sampleStatus = SAMPLE_STATUS.SAMPLED;
                } else if (traceLatencyThresholdsAndWatcher.shouldSample(duration)) {
                    sampleStatus = SAMPLE_STATUS.SAMPLED;
                } else {
                    sampleStatus = SAMPLE_STATUS.IGNORE;
                }
            }
        }

    我们来重点看下SegmentAnalysisListener下的parseSegment的方法的流程,方法传入的参数segmentObject的结构体如下

    traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
    traceSegmentId: "ec5de79a4a8848e88146ebf696d157dc.73.16092665874590000"
    spans {
      parentSpanId: -1
      startTime: 1609266587459
      endTime: 1609266589471
      refs {
        traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
        parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
        parentSpanId: 1
        parentService: "DEMO-webapp"
        parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
        parentEndpoint: "{GET}/hello/{words}"
        networkAddressUsedAtPeer: "169.254.122.166:20880"
      }
      operationName: "com.xxx.HelloService.say(String)"
      spanLayer: RPCFramework
      componentId: 3
      tags {
        key: "url"
        value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
      }
    }
    service: "DEMO-provider"
    serviceInstance: "c5ec906d3dd24d0b8907fa8549c8306b@192.168.43.80"

    这里首先处理的是dubbo-provider上传的TraceSegment,产生的是一个EntrySpan,改EntrySpan中管理了dubbo-webapp的TraceSegment

     

     

     这里在处理的时候如果当前的TraceSegment中存在5个span,那么当前TraceSegment的开始时间等于5个span中最小的starttime时间,最大时间为5个span中最大的endTime的时间,如果当前的TraceSegment中只要存在一个span的熟悉中为iserror,则改TraceSegment就为false

     这个代码是判断当前的TraceSegment是否被采用处理了,如果被采样了就可以存储到数据库中,如果没有采用改TraceSegment就丢弃不用做任何处理

     notifySegmentListener(segmentObject);本质上就是将后端上传的TraceSegment转化成一个segment对象

    接下来继续执行

     取出当前TraceSegment中的span列表,然后对没有span进行判断,判断当前的span是EntrySpan、localspan、exitspan,依据不同的span传递给不同的listener进行处理

    这里当前的TraceSegment中只有一个span

    span的对象为

    parentSpanId: -1
    startTime: 1609266587459
    endTime: 1609266589471
    refs {
      traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
      parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
      parentSpanId: 1
      parentService: "DEMO-webapp"
      parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
      parentEndpoint: "{GET}/hello/{words}"
      networkAddressUsedAtPeer: "169.254.122.166:20880"
    }
    operationName: "com.xxx.HelloService.say(String)"
    spanLayer: RPCFramework
    componentId: 3
    tags {
      key: "url"
      value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
    }

    因为当前的span为EntrySpan,对应的spanID值就是0,会被下面的listener进行处理

      notifyFirstListener(spanObject, segmentObject);底层会调用SegmentAnalysisListener的parseFirst方法,我们来看下改方法的业务逻辑

    @Override
        public void parseFirst(SpanObject span, SegmentObject segmentObject) {
            if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
                return;
            }
    
            if (StringUtil.isEmpty(serviceId)) {
                serviceName = namingControl.formatServiceName(segmentObject.getService());
                serviceId = IDManager.ServiceID.buildId(
                    serviceName,
                    NodeType.Normal
                );
            }
    
            long timeBucket = TimeBucket.getRecordTimeBucket(startTimestamp);
    
            segment.setSegmentId(segmentObject.getTraceSegmentId());
            segment.setServiceId(serviceId);
            segment.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(
                serviceId,
                namingControl.formatInstanceName(segmentObject.getServiceInstance())
            ));
            segment.setLatency(duration);
            segment.setStartTime(startTimestamp);
            segment.setTimeBucket(timeBucket);
            segment.setEndTime(endTimestamp);
            segment.setIsError(BooleanUtils.booleanToValue(isError));
            segment.setDataBinary(segmentObject.toByteArray());
            segment.setVersion(3);
    
            endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
            endpointId = IDManager.EndpointID.buildId(
                serviceId,
                endpointName
            );
        }

    1、当前service: "DEMO-provider"我们需要将"DEMO-provider"转行成对应应用id,存储到segment对象中

    2、同理应用实例serviceInstance: "c5ec906d3dd24d0b8907fa8549c8306b@192.168.43.80"我们也要转化成对应的应用实例id存储到segement对象中

     

    segement中的timeBucket字段就是当前TraceSegment中span集合中,比较各个span的开始时间,取出最小的时间,最小的时间作为segement的timeBucket

     接下来设置segemnt的开始时间,开始时间就是比较各个span的开始时间,取出最小的开始时间作为segemnt的开始时间

    segemnt的结束时间结束始时间就是比较各个span的结束时间,取出最大的结束时间作为segemnt的结束时间

    整个segemnt的持续时间就是开始时间减去结束时间,此外还将整个segemeOBj对象转化成二进制文件存储在字段setDataBinary中

    经过整个parseFirst方法之后,整个segemnt对象的结构为

     接下来我们继续执行下面的方法

     当前的span为EntrySpan,接下来会执行 notifyEntryListener(spanObject, segmentObject);方法

     

     这里MultiScopesAnalysisListener和SegmentAnalysisListener都会对整个EntrySpan进行处理,我们来分析下下面的代码

    首先来看下MultiScopesAnalysisListener对parseEntry的方法对EntrySpan的处理

     改方法的主要作用是获得当前EntrySpan关联的上游的TraceSegmentRef对象,然后将TraceSegmentRef对象封装到SourceBuilder对象中

    我们先来看下当前EntrySpan中获得的TraceSegmentRef对象的信息如下

    traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
    parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
    parentSpanId: 1
    parentService: "DEMO-webapp"
    parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
    parentEndpoint: "{GET}/hello/{words}"
    networkAddressUsedAtPeer: "169.254.122.166:20880"

     在 SourceBuilder 中记录了上下游两个系统的基础信息,核心字段如下所示:

    // 上游系统的 service 信息、serviceInstance 信息以及 Endpoint 信息
    
    @Getter @Setter private int sourceServiceId;
    
    @Getter @Setter private String sourceServiceName;
    
    @Getter @Setter private int sourceServiceInstanceId;
    
    @Getter @Setter private String sourceServiceInstanceName;
    
    @Getter @Setter private int sourceEndpointId;
    
    @Getter @Setter private String sourceEndpointName;
    
    // 下游系统的 service 信息、serviceInstance 信息以及 Endpoint 信息
    
    @Getter @Setter private int destServiceId;
    
    @Getter @Setter private String destServiceName;
    
    @Getter @Setter private int destServiceInstanceId;
    
    @Getter @Setter private String destServiceInstanceName;
    
    @Getter @Setter private int destEndpointId;
    
    @Getter @Setter private String destEndpointName;
    
    // 当前系统的组件类型
    
    @Getter @Setter private int componentId;
    
    // 在当前系统中的耗时
    
    @Getter @Setter private int latency;
    
    // 当前系统是否发生Error
    
    @Getter @Setter private boolean status;
    
    @Getter @Setter private int responseCode; // 默认为0
    
    @Getter @Setter private RequestType type; // 请求类型
    
    // 调用关系中的角色,是调用方(Client)、被调用方(Server)还是代理(Proxy)
    
    @Getter @Setter private DetectPoint detectPoint; 
    
    // TraceSegment 起始时间所在分钟级时间窗口
    
    @Getter @Setter private long timeBucket;

    这里有一个关键点

     就是如果当前的EntrySpan中没有TraceSegmentRef没有父亲入口,比如dubbo-webapp入口的EntrySpan,在http请求头信息是没有任何信息的,获得的TraceSegmentRef就是为null,这里代码中设置当前的span的sourceBuilder中的  sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME);,public static final String USER_ENDPOINT_NAME = "User";的函数表示就是浏览器或者postman等入口调用了dubbo-webapp,作为dubbo-webapp调用的入口

    这里dubbo-provider的执行的是else条件,入口的方法就是dubbo-webapp的方法

     

     这里设置sourceBuilder的上游节点内容的时候,需要判断上面有MQ类型还是一般应用,这里是一般应用执行下面的else的方法

     上游节点的信息设置完成之后,接下来是设置下游的节点信息

     

     通过这个sourcebuild对象就将当前的EntrySpan和dubb-webapp中产线的exitspan关联起来了,

    接下来将创建好的sourceBuilder添加到集合entrySourceBuilders中,后续会进行处理

    这里TraceSegment中的span集合中如果存在5个EntrySpan,就会产生5个sourceBuilder对象

    接下来继续执行 parseLogicEndpoints(span, segmentObject);

     我们来看看 parseLogicEndpoints(span, segmentObject);具体的业务代码,改方法的具体业务逻辑后面再进行分析

    MultiScopesAnalysisListener的parseEntry执行完成之后,接下来执行NetworkAddressAliasMappingListener类的parseEntry方法

    我们来看下改方法的具体业务逻辑,就是将当前EntrySpan所在的网络、应用名称、应用实例封装成一个networkAddressAliasSetup对象,然后将这个对象

    传递给sourceReceiver进行处理

     整个networkAddressAliasSetup的结构如下

     SegmentAnalysisListener的parseEntry方法就是将应用名称应用实例名称转化成对于的id

     

     经过上面的转化之后执行notifyListenerToBuild()方法

    private void notifyListenerToBuild() {
    analysisListeners.forEach(AnalysisListener::build);
    }执行各个listener的build方法,主要是执行SegmentAnalysisListener的build方法和MultiScopesAnalysisListener的build方法

    我们先介绍SegmentAnalysisListener的build方法,然后在介绍MultiScopesAnalysisListener的build方法

     收到判断当前的segment是否是被采样忽略的,如果是被采样忽略的

    skywalking后端采样率要设置为一样的原因
    一个全局的请求,在dubbo-webapp产生了一个TraceSegmentA,在dubbo-dubbo产生了一个TraceSegmentB,需要有这两个TraceSegment才能还原这一次Trace
    TraceSegmentA传递给后端的oap集群的A节点处理,TraceSegmentB传递给后端的oap集群的B节点处理,
    如果A节点和B节点在后端的采样率设置不一致,就可能导致TraceSegmentA被处理了存储在es中,TraceSegmentB被忽略直接丢弃,导致无法还原当前请求的trace情况
    所以oap集群后端每个节点的采样率必须设置为一致

      收到判断当前的segment是否是被采样忽略的,如果是被采样忽略的就直接忽略,如果不是继续执行后续的代码

    设置segemnt中endpointId的值,这里endpointID是这样计算出来的

            endpointId = IDManager.EndpointID.buildId(
                serviceId,
                endpointName
            )

    当前的应用为dubbo-provider对于的service_id的值为REVNTy13ZWJhcHA=.1

    endpointName为当前span对应的  operationName: "com.xxx.HelloService.say(String)"

     然后通过上面的这两个参数获得对应的值

             */
            public static String buildId(String serviceId, String endpointName) {
                return serviceId
                    + Const.ID_CONNECTOR
                    + encode(endpointName);
            }

    算出来了之后 

    经过处理之后整个TraceSegment的结构如下

     2、预构建 :替换url peer端口、servicenam等信息

    1、接下来将TraceSegment中各个span的类型分配给不同的spanlistener进行处理

     接下来对每一个span进行解析

    parentSpanId: -1
    startTime: 1609266200857
    endTime: 1609266202857
    refs {
    traceId: "f5411eeefb3b403f8cafa89d0b82a82f.57.16092661992120001"
    parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.57.16092661992120000"
    parentSpanId: 1
    parentService: "DEMO-webapp"
    parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
    parentEndpoint: "{GET}/hello/{words}"
    networkAddressUsedAtPeer: "169.254.122.166:20880"
    }
    operationName: "com.xxx.HelloService.say(String)"
    spanLayer: RPCFramework
    componentId: 3
    tags {
    key: "url"
    value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
    }

    依据不同的span类型进行解析,传递给不同的listen对象,然后产生不同的数据指标

  • 相关阅读:
    shell数组(产生不同的随机数)
    统计服务连接状况
    子网掩码与子网划分
    oracle 12g sqlplus安装
    MySQL的备份和还原
    mysql日志
    mysql用户和权限管理
    mysql show
    CentOS Linux解决Device eth0 does not seem to be present
    mysqldump命令详解(转载)
  • 原文地址:https://www.cnblogs.com/kebibuluan/p/14211288.html
Copyright © 2011-2022 走看看