zoukankan      html  css  js  c++  java
  • 浅谈skywalking的spring-webflux-plugin

    本文参考原文-http://bjbsair.com/2020-03-22/tech-info/5100.html

    本文主要研究一下skywalking的spring-webflux-plugin

    聊聊skywalking的spring-webflux-plugin

    DispatcherHandlerInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

    public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[0];  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[]{  
                new InstanceMethodsInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {  
                        return named("handle");  
                    }  
    ​  
                    @Override  
                    public String getMethodsInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
                    }  
    ​  
                    @Override  
                    public boolean isOverrideArgs() {  
                        return false;  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.reactive.DispatcherHandler");  
        }  
    }
    
    • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

    DispatcherHandlerHandleMethodInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

    public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
        private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
    ​  
        @Override  
        public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                 MethodInterceptResult result) throws Throwable {  
    ​  
        }  
    ​  
        @Override  
        public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                  Object ret) throws Throwable {  
            EnhancedInstance instance = getInstance(allArguments[0]);  
    ​  
            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
    ​  
            ContextCarrier carrier = new ContextCarrier();  
            CarrierItem next = carrier.items();  
            HttpHeaders headers = exchange.getRequest().getHeaders();  
            while (next.hasNext()) {  
                next = next.next();  
                List<String> header = headers.get(next.getHeadKey());  
                if (header != null && header.size() > 0) {  
                    next.setHeadValue(header.get(0));  
                }  
            }  
    ​  
            AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
            span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
            SpanLayer.asHttp(span);  
            Tags.URL.set(span, exchange.getRequest().getURI().toString());  
            HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
            instance.setSkyWalkingDynamicField(ContextManager.capture());  
            span.prepareForAsync();  
            ContextManager.stopSpan(span);  
    ​  
            return ((Mono) ret).doFinally(s -> {  
                try {  
                    Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
                    if (pathPattern != null) {  
                        span.setOperationName(((PathPattern) pathPattern).getPatternString());  
                    }  
                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
                    if (httpStatus != null) {  
                        Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
                        if (httpStatus.isError()) {  
                            span.errorOccurred();  
                        }  
                    }  
                } finally {  
                    span.asyncFinish();  
                }  
            });  
    ​  
        }  
    ​  
        @Override  
        public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
                                          Class<?>[] argumentsTypes, Throwable t) {  
        }  
    ​  
        public static EnhancedInstance getInstance(Object o) {  
            EnhancedInstance instance = null;  
            if (o instanceof DefaultServerWebExchange) {  
                instance = (EnhancedInstance) o;  
            } else if (o instanceof ServerWebExchangeDecorator) {  
                ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
                return getInstance(delegate);  
            }  
            return instance;  
        }  
    ​  
    }
    
    • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

    ServerWebExchangeInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

    public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[]{  
                new ConstructorInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {  
                        return any();  
                    }  
    ​  
                    @Override  
                    public String getConstructorInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[0];  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
        }  
    }
    
    • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    ServerWebExchangeConstructorInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

    public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
        @Override  
        public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
        }  
    }
    
    • ServerWebExchangeConstructorInterceptor目前还是空实现

    小结

    DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    doc

    本文主要研究一下skywalking的spring-webflux-plugin

    聊聊skywalking的spring-webflux-plugin

    DispatcherHandlerInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

    public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[0];  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[]{  
                new InstanceMethodsInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {  
                        return named("handle");  
                    }  
    ​  
                    @Override  
                    public String getMethodsInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
                    }  
    ​  
                    @Override  
                    public boolean isOverrideArgs() {  
                        return false;  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.reactive.DispatcherHandler");  
        }  
    }
    
    • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

    DispatcherHandlerHandleMethodInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

    public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
        private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
    ​  
        @Override  
        public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                 MethodInterceptResult result) throws Throwable {  
    ​  
        }  
    ​  
        @Override  
        public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                  Object ret) throws Throwable {  
            EnhancedInstance instance = getInstance(allArguments[0]);  
    ​  
            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
    ​  
            ContextCarrier carrier = new ContextCarrier();  
            CarrierItem next = carrier.items();  
            HttpHeaders headers = exchange.getRequest().getHeaders();  
            while (next.hasNext()) {  
                next = next.next();  
                List<String> header = headers.get(next.getHeadKey());  
                if (header != null && header.size() > 0) {  
                    next.setHeadValue(header.get(0));  
                }  
            }  
    ​  
            AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
            span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
            SpanLayer.asHttp(span);  
            Tags.URL.set(span, exchange.getRequest().getURI().toString());  
            HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
            instance.setSkyWalkingDynamicField(ContextManager.capture());  
            span.prepareForAsync();  
            ContextManager.stopSpan(span);  
    ​  
            return ((Mono) ret).doFinally(s -> {  
                try {  
                    Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
                    if (pathPattern != null) {  
                        span.setOperationName(((PathPattern) pathPattern).getPatternString());  
                    }  
                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
                    if (httpStatus != null) {  
                        Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
                        if (httpStatus.isError()) {  
                            span.errorOccurred();  
                        }  
                    }  
                } finally {  
                    span.asyncFinish();  
                }  
            });  
    ​  
        }  
    ​  
        @Override  
        public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
                                          Class<?>[] argumentsTypes, Throwable t) {  
        }  
    ​  
        public static EnhancedInstance getInstance(Object o) {  
            EnhancedInstance instance = null;  
            if (o instanceof DefaultServerWebExchange) {  
                instance = (EnhancedInstance) o;  
            } else if (o instanceof ServerWebExchangeDecorator) {  
                ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
                return getInstance(delegate);  
            }  
            return instance;  
        }  
    ​  
    }
    
    • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

    ServerWebExchangeInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

    public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[]{  
                new ConstructorInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {  
                        return any();  
                    }  
    ​  
                    @Override  
                    public String getConstructorInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[0];  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
        }  
    }
    
    • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    ServerWebExchangeConstructorInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

    public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
        @Override  
        public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
        }  
    }
    
    • ServerWebExchangeConstructorInterceptor目前还是空实现

    小结

    DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    doc

    本文主要研究一下skywalking的spring-webflux-plugin

    聊聊skywalking的spring-webflux-plugin

    DispatcherHandlerInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

    public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[0];  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[]{  
                new InstanceMethodsInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {  
                        return named("handle");  
                    }  
    ​  
                    @Override  
                    public String getMethodsInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
                    }  
    ​  
                    @Override  
                    public boolean isOverrideArgs() {  
                        return false;  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.reactive.DispatcherHandler");  
        }  
    }
    
    • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

    DispatcherHandlerHandleMethodInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

    public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
        private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
    ​  
        @Override  
        public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                 MethodInterceptResult result) throws Throwable {  
    ​  
        }  
    ​  
        @Override  
        public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                  Object ret) throws Throwable {  
            EnhancedInstance instance = getInstance(allArguments[0]);  
    ​  
            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
    ​  
            ContextCarrier carrier = new ContextCarrier();  
            CarrierItem next = carrier.items();  
            HttpHeaders headers = exchange.getRequest().getHeaders();  
            while (next.hasNext()) {  
                next = next.next();  
                List<String> header = headers.get(next.getHeadKey());  
                if (header != null && header.size() > 0) {  
                    next.setHeadValue(header.get(0));  
                }  
            }  
    ​  
            AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
            span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
            SpanLayer.asHttp(span);  
            Tags.URL.set(span, exchange.getRequest().getURI().toString());  
            HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
            instance.setSkyWalkingDynamicField(ContextManager.capture());  
            span.prepareForAsync();  
            ContextManager.stopSpan(span);  
    ​  
            return ((Mono) ret).doFinally(s -> {  
                try {  
                    Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
                    if (pathPattern != null) {  
                        span.setOperationName(((PathPattern) pathPattern).getPatternString());  
                    }  
                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
                    if (httpStatus != null) {  
                        Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
                        if (httpStatus.isError()) {  
                            span.errorOccurred();  
                        }  
                    }  
                } finally {  
                    span.asyncFinish();  
                }  
            });  
    ​  
        }  
    ​  
        @Override  
        public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
                                          Class<?>[] argumentsTypes, Throwable t) {  
        }  
    ​  
        public static EnhancedInstance getInstance(Object o) {  
            EnhancedInstance instance = null;  
            if (o instanceof DefaultServerWebExchange) {  
                instance = (EnhancedInstance) o;  
            } else if (o instanceof ServerWebExchangeDecorator) {  
                ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
                return getInstance(delegate);  
            }  
            return instance;  
        }  
    ​  
    }
    
    • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

    ServerWebExchangeInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

    public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[]{  
                new ConstructorInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {  
                        return any();  
                    }  
    ​  
                    @Override  
                    public String getConstructorInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[0];  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
        }  
    }
    
    • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    ServerWebExchangeConstructorInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

    public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
        @Override  
        public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
        }  
    }
    
    • ServerWebExchangeConstructorInterceptor目前还是空实现

    小结

    DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    doc

    本文主要研究一下skywalking的spring-webflux-plugin

    聊聊skywalking的spring-webflux-plugin

    DispatcherHandlerInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

    public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[0];  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[]{  
                new InstanceMethodsInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {  
                        return named("handle");  
                    }  
    ​  
                    @Override  
                    public String getMethodsInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
                    }  
    ​  
                    @Override  
                    public boolean isOverrideArgs() {  
                        return false;  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.reactive.DispatcherHandler");  
        }  
    }
    
    • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

    DispatcherHandlerHandleMethodInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

    public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
        private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
    ​  
        @Override  
        public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                 MethodInterceptResult result) throws Throwable {  
    ​  
        }  
    ​  
        @Override  
        public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                  Object ret) throws Throwable {  
            EnhancedInstance instance = getInstance(allArguments[0]);  
    ​  
            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
    ​  
            ContextCarrier carrier = new ContextCarrier();  
            CarrierItem next = carrier.items();  
            HttpHeaders headers = exchange.getRequest().getHeaders();  
            while (next.hasNext()) {  
                next = next.next();  
                List<String> header = headers.get(next.getHeadKey());  
                if (header != null && header.size() > 0) {  
                    next.setHeadValue(header.get(0));  
                }  
            }  
    ​  
            AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
            span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
            SpanLayer.asHttp(span);  
            Tags.URL.set(span, exchange.getRequest().getURI().toString());  
            HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
            instance.setSkyWalkingDynamicField(ContextManager.capture());  
            span.prepareForAsync();  
            ContextManager.stopSpan(span);  
    ​  
            return ((Mono) ret).doFinally(s -> {  
                try {  
                    Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
                    if (pathPattern != null) {  
                        span.setOperationName(((PathPattern) pathPattern).getPatternString());  
                    }  
                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
                    if (httpStatus != null) {  
                        Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
                        if (httpStatus.isError()) {  
                            span.errorOccurred();  
                        }  
                    }  
                } finally {  
                    span.asyncFinish();  
                }  
            });  
    ​  
        }  
    ​  
        @Override  
        public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
                                          Class<?>[] argumentsTypes, Throwable t) {  
        }  
    ​  
        public static EnhancedInstance getInstance(Object o) {  
            EnhancedInstance instance = null;  
            if (o instanceof DefaultServerWebExchange) {  
                instance = (EnhancedInstance) o;  
            } else if (o instanceof ServerWebExchangeDecorator) {  
                ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
                return getInstance(delegate);  
            }  
            return instance;  
        }  
    ​  
    }
    
    • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

    ServerWebExchangeInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

    public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[]{  
                new ConstructorInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {  
                        return any();  
                    }  
    ​  
                    @Override  
                    public String getConstructorInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[0];  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
        }  
    }
    
    • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    ServerWebExchangeConstructorInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

    public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
        @Override  
        public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
        }  
    }
    
    • ServerWebExchangeConstructorInterceptor目前还是空实现

    小结

    DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    doc

    本文主要研究一下skywalking的spring-webflux-plugin

    聊聊skywalking的spring-webflux-plugin

    DispatcherHandlerInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

    public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[0];  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[]{  
                new InstanceMethodsInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {  
                        return named("handle");  
                    }  
    ​  
                    @Override  
                    public String getMethodsInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";  
                    }  
    ​  
                    @Override  
                    public boolean isOverrideArgs() {  
                        return false;  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.reactive.DispatcherHandler");  
        }  
    }
    
    • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

    DispatcherHandlerHandleMethodInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

    public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {  
        private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";  
    ​  
        @Override  
        public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                 MethodInterceptResult result) throws Throwable {  
    ​  
        }  
    ​  
        @Override  
        public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,  
                                  Object ret) throws Throwable {  
            EnhancedInstance instance = getInstance(allArguments[0]);  
    ​  
            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];  
    ​  
            ContextCarrier carrier = new ContextCarrier();  
            CarrierItem next = carrier.items();  
            HttpHeaders headers = exchange.getRequest().getHeaders();  
            while (next.hasNext()) {  
                next = next.next();  
                List<String> header = headers.get(next.getHeadKey());  
                if (header != null && header.size() > 0) {  
                    next.setHeadValue(header.get(0));  
                }  
            }  
    ​  
            AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);  
            span.setComponent(ComponentsDefine.SPRING_WEBFLUX);  
            SpanLayer.asHttp(span);  
            Tags.URL.set(span, exchange.getRequest().getURI().toString());  
            HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());  
            instance.setSkyWalkingDynamicField(ContextManager.capture());  
            span.prepareForAsync();  
            ContextManager.stopSpan(span);  
    ​  
            return ((Mono) ret).doFinally(s -> {  
                try {  
                    Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);  
                    if (pathPattern != null) {  
                        span.setOperationName(((PathPattern) pathPattern).getPatternString());  
                    }  
                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();  
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support  
                    if (httpStatus != null) {  
                        Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));  
                        if (httpStatus.isError()) {  
                            span.errorOccurred();  
                        }  
                    }  
                } finally {  
                    span.asyncFinish();  
                }  
            });  
    ​  
        }  
    ​  
        @Override  
        public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,  
                                          Class<?>[] argumentsTypes, Throwable t) {  
        }  
    ​  
        public static EnhancedInstance getInstance(Object o) {  
            EnhancedInstance instance = null;  
            if (o instanceof DefaultServerWebExchange) {  
                instance = (EnhancedInstance) o;  
            } else if (o instanceof ServerWebExchangeDecorator) {  
                ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();  
                return getInstance(delegate);  
            }  
            return instance;  
        }  
    ​  
    }
    
    • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

    ServerWebExchangeInstrumentation

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

    public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {  
        @Override  
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {  
            return new ConstructorInterceptPoint[]{  
                new ConstructorInterceptPoint() {  
                    @Override  
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {  
                        return any();  
                    }  
    ​  
                    @Override  
                    public String getConstructorInterceptor() {  
                        return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";  
                    }  
                }  
            };  
        }  
    ​  
        @Override  
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {  
            return new InstanceMethodsInterceptPoint[0];  
        }  
    ​  
        @Override  
        protected ClassMatch enhanceClass() {  
            return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");  
        }  
    }
    
    • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    ServerWebExchangeConstructorInterceptor

    skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

    public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {  
        @Override  
        public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {  
        }  
    }
    
    • ServerWebExchangeConstructorInterceptor目前还是空实现

    小结

    DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

    doc

    • DispatcherHandlerInstrumentation
    • ServerWebExchangeInstrumentation
  • 相关阅读:
    openwrt 的依赖找不到问题
    数据包与IPTABLE关系
    wifidog 配置中文说明
    Java 线程
    Java 集合
    IDEA配置Maven并创建web项目
    逻辑覆盖
    获得天气数据
    小程序项目文件介绍
    window 10 使用git
  • 原文地址:https://www.cnblogs.com/lihanlin/p/12556092.html
Copyright © 2011-2022 走看看