zoukankan      html  css  js  c++  java
  • 调用链系列三、基于zipkin调用链封装starter实现springmvc、dubbo、restTemplate等实现全链路跟踪

    一、实现思路

    1、过滤器实现思路

    所有调用链数据都通过过滤器实现埋点并收集、同一条链共享一个traceId、每个节点有唯一的spanId。

    2、共享传递方式

    1、rpc调用:通过隐式传参、dubbo有提供spi在rpc调用之前塞到请求中。参考:dubbo系列六、SPI扩展Filter隐式传参

    2、http调用:通过servlet过滤器、在请求前放入requestHead中传递、resTemplate也是如此。

    参考:调用链二、Zipkin 和 Brave 实现(springmvc、RestTemplate)服务调用跟踪

    3、redis和DB等调用:原理相似,可以通过aop在调用前后拦截。

    4、业务代码侵入低。

    二、架构图

    三、业务项目接入starter步骤

    1、pom.xml添加maven包

    <dependency>
           <groupId>trace.starter</groupId>
           <artifactId>trace-starter</artifactId>
    <version>1.0.2-SNAPSHOT</version>
    </dependency>

    2、application.yml添加trace配置

    dubbo.trace:
      enabled: true
      connectTimeout: 60
      readTimeout: 60
      flushInterval: 0
      compressionEnabled: true
      applicationName: dubbo-zipkin0
      zipkinUrl: http://192.168.1.100:9411

    3、在springboot开启注解

    @EnableTraceAutoConfigurationProperties,例如:

    package com.example.demo;
    import com.dubbo.trace.configuration.EnableTraceAutoConfigurationProperties;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    @EnableTraceAutoConfigurationProperties
    public class DemoApplication {
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    }

    四、实现代码

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>trace.starter</groupId>
        <artifactId>trace-starter</artifactId>
        <version>1.0.2-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.4.7.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!--zipkin-brave start-->
            <dependency>
                <groupId>io.zipkin.brave</groupId>
                <artifactId>brave-core</artifactId>
                <version>3.9.0</version>
            </dependency>
            <!--http方式收集-->
            <dependency>
                <groupId>io.zipkin.brave</groupId>
                <artifactId>brave-spancollector-http</artifactId>
                <version>3.9.0</version>
            </dependency>
            <!--zipkin-brave end-->
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.1.31</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.10</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo</artifactId>
                <version>2.6.1</version>
                <exclusions>
                    <exclusion>
                        <artifactId>spring</artifactId>
                        <groupId>org.springframework</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>commons-lang</groupId>
                <artifactId>commons-lang</artifactId>
                <version>2.6</version>
            </dependency>
    
            <!--logback日志-->
            <dependency>
                <groupId>net.logstash.logback</groupId>
                <artifactId>logstash-logback-encoder</artifactId>
                <version>4.8</version>
            </dependency>
    
            <dependency>
                <groupId>com.google.auto.value</groupId>
                <artifactId>auto-value</artifactId>
                <version>1.2</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
    
        </dependencies>
    </project>
    View Code

    1、base过滤器、过滤器父类

    package com.dubbo.trace.base;
    
    import com.dubbo.trace.TraceContext;
    import com.dubbo.trace.send.TraceAgent;
    import com.dubbo.trace.utils.IdUtils;
    import com.dubbo.trace.utils.NetworkUtils;
    import com.twitter.zipkin.gen.Annotation;
    import com.twitter.zipkin.gen.BinaryAnnotation;
    import com.twitter.zipkin.gen.Endpoint;
    import com.twitter.zipkin.gen.Span;
    import net.logstash.logback.encoder.org.apache.commons.lang.StringUtils;
    import org.springframework.http.HttpRequest;
    
    import javax.servlet.http.HttpServletRequest;
    
    
    public abstract class BaseFilter {
    
        /**
         * 创建span信息
         */
        protected Span createSpan() {
            Span span = new Span();
    
            long id = IdUtils.getId();
            span.setId(id);
    
            Long traceId = TraceContext.getTraceId();
            // 首次调用
            if (traceId == null) {
                TraceContext.start();
                traceId = id;
                TraceContext.setTraceId(traceId);
            }
    
            span.setTrace_id(traceId);
            span.setName(TraceContext.getTraceConfig().getApplicationName());
    
            // 首次调用spanId和parentId相等
            if (TraceContext.getSpanId() == null) {
                span.setParent_id(span.getId());
            }
    
            span.setParent_id(TraceContext.getSpanId());
            TraceContext.setSpanId(span.getId());
    
            return span;
        }
    
        /**
         * 添加节点信息
         */
        public void addToAnnotations(Span span, String traceType, Long timeStamp) {
            span.addToAnnotations(
                    Annotation.create(timeStamp, traceType,
                            Endpoint.create(TraceContext.getTraceConfig().getApplicationName(),
                                    NetworkUtils.getIp(),
                                    TraceContext.getTraceConfig().getServerPort()))
            );
        }
    
        /**
         * 增加接口信息
         */
        protected void addToBinary_annotations(Span span, String key, String value) {
            span.addToBinary_annotations(BinaryAnnotation.create(key, value,
                    Endpoint.create(TraceContext.getTraceConfig().getApplicationName(),
                            NetworkUtils.getIp(),
                            TraceContext.getTraceConfig().getServerPort())));
        }
    
        /**
         * 结束调用链
         */
        public void endTrace(Span span, Long duration, String traceType) {
            addToAnnotations(span, traceType, System.currentTimeMillis() * 1000);
            span.setDuration(duration);
            TraceAgent.getTraceAgent().send(span);
        }
    
        protected void getTraceHttpHeader(HttpServletRequest httpReq) {
    
            String traceId = httpReq.getHeader("trace_id");
            String spanId = httpReq.getHeader("span_id");
    
            if (StringUtils.isNotBlank(traceId)) {
                TraceContext.setTraceId(Long.parseLong(traceId));
            }
    
            if (StringUtils.isNotBlank(spanId)) {
                TraceContext.setSpanId(Long.parseLong(spanId));
            }
        }
    
        protected void setTraceToHttpHeader(HttpRequest httpRequest, Span span) {
            // 内部请求可以携带trace信息,外部请求改行代码注释掉
            httpRequest.getHeaders().set("trace_id", String.valueOf(span.getTrace_id()));
            httpRequest.getHeaders().set("span_id", String.valueOf(span.getId()));
        }
    
    }

    2、dubbo消费者过滤器:TraceConsumerFilter.java

    package com.dubbo.trace.dubbo;
    
    import com.alibaba.dubbo.rpc.*;
    import com.dubbo.trace.TraceContext;
    import com.dubbo.trace.base.BaseFilter;
    import com.twitter.zipkin.gen.Span;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import java.util.Arrays;
    import java.util.Map;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    
    @Component
    public class TraceConsumerFilter extends BaseFilter implements Filter {
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            // 开启调用链
            Span span = this.startTrace(invoker, invocation);
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            // 远程调用
            Result result = invoker.invoke(invocation);
    
            // 结束调用链
            stopWatch.stop();
            // 记录出参
            addToBinary_annotations(span, "results", result.getValue().toString());
            this.endTrace(span, stopWatch.getTotalTimeMillis() * 1000, TraceContext.ANNO_CR);
            return result;
        }
    
        protected Span startTrace(Invoker<?> invoker, Invocation invocation) {
            Span span = createSpan();
    
            Long timeStamp = System.currentTimeMillis() * 1000;
            span.setTimestamp(timeStamp);
            span.setName("RPC:" + invoker.getInterface().getSimpleName() + ":" + invocation.getMethodName());
    
            addToAnnotations(span, TraceContext.ANNO_CS, timeStamp);
    
            Map<String, String> attaches = invocation.getAttachments();
            attaches.put(TraceContext.TRACE_ID_KEY, String.valueOf(span.getTrace_id()));
            attaches.put(TraceContext.SPAN_ID_KEY, String.valueOf(span.getId()));
    
            // 记录入参
            addToBinary_annotations(span, "params", Arrays.toString(invocation.getArguments()));
            return span;
        }
    }

    3、dubbo生产者过滤器:TraceProviderFilter.java

    package com.dubbo.trace.dubbo;
    
    import com.alibaba.dubbo.rpc.*;
    import com.dubbo.trace.TraceContext;
    import com.dubbo.trace.base.BaseFilter;
    import com.twitter.zipkin.gen.Span;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import java.util.Arrays;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    
    @Component
    public class TraceProviderFilter extends BaseFilter implements Filter {
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    
            // 开启调用链
            Span span = this.startTrace(invoker, invocation);
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            // 远程调用
            Result result = invoker.invoke(invocation);
    
            // 结束调用链
            stopWatch.stop();
            // 记录出参
            this.addToBinary_annotations(span, "results", result.getValue().toString());
            this.endTrace(span, stopWatch.getTotalTimeMillis() * 1000, TraceContext.ANNO_SS);
    
            return result;
        }
    
        protected Span startTrace(Invoker<?> invoker, Invocation invocation) {
            Long traceId = Long.valueOf(invocation.getAttachment(TraceContext.TRACE_ID_KEY));
            Long spanId = Long.valueOf(invocation.getAttachment(TraceContext.SPAN_ID_KEY));
    
            TraceContext.setTraceId(traceId);
            TraceContext.setSpanId(spanId);
    
            Span span = createSpan();
    
            Long timeStamp = System.currentTimeMillis() * 1000;
            span.setTimestamp(timeStamp);
            addToAnnotations(span, TraceContext.ANNO_SR, timeStamp);
    
            span.setName("RPC:" + invoker.getInterface().getSimpleName() + ":" + invocation.getMethodName());
    
            // 记录入参
            addToBinary_annotations(span, "params", Arrays.toString(invocation.getArguments()));
    
            return span;
        }
    
    }

    4、RestTemplate过滤器:RestTemplateFilter.java

    package com.dubbo.trace.restTemplate;
    
    import com.dubbo.trace.TraceContext;
    import com.dubbo.trace.base.BaseFilter;
    import com.dubbo.trace.utils.NetworkUtils;
    import com.twitter.zipkin.gen.BinaryAnnotation;
    import com.twitter.zipkin.gen.Endpoint;
    import com.twitter.zipkin.gen.Span;
    import org.springframework.http.HttpRequest;
    import org.springframework.http.client.ClientHttpRequestExecution;
    import org.springframework.http.client.ClientHttpRequestInterceptor;
    import org.springframework.http.client.ClientHttpResponse;
    import org.springframework.util.StopWatch;
    
    import java.io.IOException;
    
    public class RestTemplateFilter extends BaseFilter implements ClientHttpRequestInterceptor {
        @Override
        public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] body, ClientHttpRequestExecution execution) throws IOException {
            // 开启调用链
            ClientHttpResponse response = null;
            Span span = this.startTrace(httpRequest);
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            try {
                // 内部请求头可以携带trace信息,外部请求改行代码注释掉
                this.setTraceToHttpHeader(httpRequest, span);
                // 保证请求继续被执行
                response = execution.execute(httpRequest, body);
    
                // 结束调用链
                this.endTrace(span, stopWatch.getTotalTimeMillis() * 1000, TraceContext.ANNO_CR);
            } catch (Exception e) {
                // 异常记录到调用链
                span.addToBinary_annotations(BinaryAnnotation.create("error", e.getMessage(),
                        Endpoint.create(TraceContext.getTraceConfig().getApplicationName(),
                                NetworkUtils.getIp(),
                                TraceContext.getTraceConfig().getServerPort())));
                this.endTrace(span, stopWatch.getTotalTimeMillis() * 1000, TraceContext.ANNO_CR);
                e.printStackTrace();
            }
    
            return response;
        }
    
        private Span startTrace(HttpRequest httpRequest) {
            Span span = createSpan();
            Long timeStamp = System.currentTimeMillis() * 1000;
            span.setTimestamp(timeStamp);
            span.setName("restTemplate:" + httpRequest.getURI() + ":" + httpRequest.getMethod());
            addToAnnotations(span, TraceContext.ANNO_CS, timeStamp);
            return span;
        }
    
    }

    5、Servlet过滤器:TraceServletFilter.java

    package com.dubbo.trace.servlet;
    
    import com.dubbo.trace.TraceContext;
    import com.dubbo.trace.base.BaseFilter;
    import com.dubbo.trace.utils.NetworkUtils;
    import com.twitter.zipkin.gen.BinaryAnnotation;
    import com.twitter.zipkin.gen.Endpoint;
    import com.twitter.zipkin.gen.Span;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import javax.servlet.*;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    
    @Slf4j
    @Component
    public class TraceServletFilter extends BaseFilter implements Filter {
    
        public TraceServletFilter() {
        }
    
        public void destroy() {
        }
    
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
    
        }
    
        public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException {
            HttpServletRequest httpReq = (HttpServletRequest) req;
            BufferedHttpRequestWrapper newReq = new BufferedHttpRequestWrapper(httpReq);
    
            Span span = this.startTrace(newReq);
            Long timeStamp = System.currentTimeMillis() * 1000;
            span.setTimestamp(timeStamp);
            addToAnnotations(span, TraceContext.ANNO_SR, timeStamp);
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            try {
                chain.doFilter(newReq, resp);
                stopWatch.stop();
            } catch (Throwable var15) {
                span.addToBinary_annotations(BinaryAnnotation.create("error", var15.getMessage(),
                        Endpoint.create(NetworkUtils.getIp() + ":" + TraceContext.getTraceConfig().getServerPort() + httpReq.getRequestURL().toString(),
                                NetworkUtils.getIp(),
                                TraceContext.getTraceConfig().getServerPort())));
                this.endTrace(span, stopWatch.getTotalTimeMillis() * 1000, TraceContext.ANNO_SS);
                throw var15;
            } finally {
                HttpServletResponse var12 = (HttpServletResponse) resp;
                var12.setHeader("trace_id", String.valueOf(span.getTrace_id()));
                var12.setHeader("span_id", String.valueOf(span.getId()));
                this.endTrace(span, stopWatch.getTotalTimeMillis() * 1000, TraceContext.ANNO_SS);
            }
    
        }
    
    
        public Span startTrace(HttpServletRequest httpReq) {
            // 处理HTTP头部trace信息
            getTraceHttpHeader(httpReq);
            Span span = createSpan();
            span.setName("HTTP:" + NetworkUtils.ip + ":" + TraceContext.getTraceConfig().getServerPort() + httpReq.getRequestURI());
    
            // cookies
            // addToBinary_annotations(span,"cookies",Arrays.toString(httpReq.getCookies()));
            return span;
        }
    
    }

    6、BufferedHttpRequestWrapper.java

    package com.dubbo.trace.servlet;
    
    import org.apache.commons.io.IOUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.servlet.ReadListener;
    import javax.servlet.ServletInputStream;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletRequestWrapper;
    import java.io.*;
    
    public class BufferedHttpRequestWrapper extends HttpServletRequestWrapper {
        private static final Logger log = LoggerFactory.getLogger(BufferedHttpRequestWrapper.class);
        private byte[] reqBody = null;
    
        public BufferedHttpRequestWrapper(HttpServletRequest request) throws IOException {
            super(request);
            this.reqBody = IOUtils.toString(request.getInputStream(), "utf-8").getBytes();
        }
    
        @Override
        public ServletInputStream getInputStream() throws IOException {
            return new MyServletInputStream(new ByteArrayInputStream(this.reqBody));
    
        }
    
        public BufferedReader getReader() throws IOException {
            return new BufferedReader(new InputStreamReader(this.getInputStream()));
        }
    
        public String getRequestBody() throws UnsupportedEncodingException {
            return new String(this.reqBody, "utf-8");
        }
    
        class MyServletInputStream extends ServletInputStream {
            private InputStream inputStream;
    
            public MyServletInputStream(InputStream inputStream) {
                this.inputStream = inputStream;
            }
    
            @Override
            public int read() throws IOException {
                return inputStream.read();
            }
    
            @Override
            public boolean isFinished() {
                return false;
            }
    
            @Override
            public boolean isReady() {
                return false;
            }
    
            @Override
            public void setReadListener(ReadListener readListener) {
    
            }
        }
    
    }

    7、TraceContext.java 上下文

    用于存储共享的traceId和spanId到InheritableThreadLocal中(子线程也能获取到、而ThreadLocal不能)

    package com.dubbo.trace;
    
    import com.dubbo.trace.configuration.TraceConfig;
    import com.twitter.zipkin.gen.Span;
    import lombok.Data;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    @Data
    @Component
    public class TraceContext {
        public static final String TRACE_ID_KEY = "traceId";
        public static final String SPAN_ID_KEY = "spanId";
        public static final String ANNO_CS = "cs";
        public static final String ANNO_CR = "cr";
        public static final String ANNO_SR = "sr";
        public static final String ANNO_SS = "ss";
        public static TraceContext traceContext;
        private static ThreadLocal<Long> TRACE_ID = new InheritableThreadLocal<>();
        private static ThreadLocal<Long> SPAN_ID = new InheritableThreadLocal<>();
        private static ThreadLocal<List<Span>> SPAN_LIST = new InheritableThreadLocal<>();
        @Autowired
        private TraceConfig traceConfig;
    
        public static Long getSpanId() {
            return SPAN_ID.get();
        }
    
        public static void setSpanId(Long spanId) {
            SPAN_ID.set(spanId);
        }
    
        public static Long getTraceId() {
            return TRACE_ID.get();
        }
    
        public static void setTraceId(Long traceId) {
            TRACE_ID.set(traceId);
        }
    
        public static void clear() {
            TRACE_ID.remove();
            SPAN_ID.remove();
            SPAN_LIST.remove();
        }
    
        public static void start() {
            clear();
            SPAN_LIST.set(new ArrayList<Span>());
        }
    
        public static TraceConfig getTraceConfig() {
            return traceContext.traceConfig;
        }
    
        public static void addSpan(Span span) {
            List<Span> spanList = SPAN_LIST.get();
            spanList.add(span);
        }
    
        @PostConstruct
        public void init() {
            traceContext = this;
        }
    }

    8、工具类用户生成、traceId和spanId、获取本机IP

    IdUtils.java

    package com.dubbo.trace.utils;
    
    import java.util.Random;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    public class IdUtils {
        public static Long getId() {
            return NetworkUtils.getIp() + System.currentTimeMillis();
        }
    }

    NetworkUtils.java

    package com.dubbo.trace.utils;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月29日
     * @since 1.0
     */
    public class NetworkUtils {
        public static String ip;
        private static InetAddress addr;
    
        static {
            try {
                addr = InetAddress.getLocalHost();
                ip = addr.getHostAddress().toString(); //获取本机ip
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }
    
        public static int ipToLong(String strIp) {
            int[] ip = new int[4];
            //先找到IP地址字符串中.的位置  
            int position1 = strIp.indexOf(".");
            int position2 = strIp.indexOf(".", position1 + 1);
            int position3 = strIp.indexOf(".", position2 + 1);
            //将每个.之间的字符串转换成整型  
            ip[0] = Integer.parseInt(strIp.substring(0, position1));
            ip[1] = Integer.parseInt(strIp.substring(position1 + 1, position2));
            ip[2] = Integer.parseInt(strIp.substring(position2 + 1, position3));
            ip[3] = Integer.parseInt(strIp.substring(position3 + 1));
            return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
        }
    
        public static int getIp() {
            return ipToLong(ip);
        }
    }

    9、收集代理和发送实现类

    目前发送http请求、后续可扩展发送MQ

    SendHttp.java

    package com.dubbo.trace.send;
    
    import com.github.kristofa.brave.http.HttpSpanCollector;
    import com.twitter.zipkin.gen.Span;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @DependsOn("zipkinConfig")
    public class SendHttp extends TraceAgent {
    
        @Autowired
        private HttpSpanCollector httpSpanCollector;
    
    
        @Override
        public void send(Span span) {
            if (span != null) {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(span.toString());
                        httpSpanCollector.collect(span);
                        httpSpanCollector.flush();
                    }
                });
            }
        }
    
    }

    TraceAgent.java

    package com.dubbo.trace.send;
    
    import com.twitter.zipkin.gen.Span;
    
    import javax.annotation.PostConstruct;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    public abstract class TraceAgent {
        private static TraceAgent traceAgent;
        private final int THREAD_POOL_COUNT = 5;
        protected final ExecutorService executor =
                Executors.newFixedThreadPool(this.THREAD_POOL_COUNT, new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread worker = new Thread(r);
                        worker.setName("TRACE-AGENT-WORKER");
                        worker.setDaemon(true);
                        return worker;
                    }
                });
    
        public static TraceAgent getTraceAgent() {
            return traceAgent;
        }
    
        public abstract void send(Span span);
    
        @PostConstruct
        public void init() {
            traceAgent = this;
        }
    }

    10、配置类

    FilterConfig.java

    package com.dubbo.trace.configuration;
    
    import com.dubbo.trace.restTemplate.RestTemplateFilter;
    import com.dubbo.trace.servlet.TraceServletFilter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.web.servlet.FilterRegistrationBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.client.ClientHttpRequestInterceptor;
    import org.springframework.web.client.RestTemplate;
    
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    
    @Configuration
    public class FilterConfig {
    
    
        /**
         * httpClient客户端拦截器,需要clientRequestInterceptor,clientResponseInterceptor分别完成cs和cr操作
         *
         * @param brave
         * @return
         */
    
        @Autowired
        private RestTemplate restTemplate;
    
        @Bean
        RestTemplate template() {
            return new RestTemplate();
        }
    
        // 添加rest template拦截器
        @PostConstruct
        public void init() {
            List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(restTemplate.getInterceptors());
            interceptors.add(new RestTemplateFilter());
            restTemplate.setInterceptors(interceptors);
        }
    
        /**
         * servlet过滤器,自定义过滤器完成cs和cr
         *
         * @return
         */
        @Bean
        public FilterRegistrationBean callTrackingServletFilter() {
            TraceServletFilter filter = new TraceServletFilter();
            FilterRegistrationBean registrationBean = new FilterRegistrationBean();
            registrationBean.setFilter(filter);
            List<String> urlPatterns = new ArrayList();
            urlPatterns.add("/*");
            registrationBean.setUrlPatterns(urlPatterns);
            registrationBean.setOrder(1);
            return registrationBean;
        }
    }

    TraceConfig.java

    package com.dubbo.trace.configuration;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    @ConfigurationProperties(prefix = "dubbo.trace")
    @Component
    public class TraceConfig {
    
        private boolean enabled;
        private int connectTimeout;
        private int readTimeout;
        private int flushInterval;
        private boolean compressionEnabled;
        private String zipkinUrl;
        private int serverPort = 9411;
        private String applicationName;
    
        public boolean isEnabled() {
            return enabled;
        }
    
        public void setEnabled(boolean enabled) {
            this.enabled = enabled;
        }
    
        public int getConnectTimeout() {
            return connectTimeout;
        }
    
        public void setConnectTimeout(int connectTimeout) {
            this.connectTimeout = connectTimeout;
        }
    
        public int getReadTimeout() {
            return readTimeout;
        }
    
        public void setReadTimeout(int readTimeout) {
            this.readTimeout = readTimeout;
        }
    
        public int getFlushInterval() {
            return flushInterval;
        }
    
        public void setFlushInterval(int flushInterval) {
            this.flushInterval = flushInterval;
        }
    
        public boolean isCompressionEnabled() {
            return compressionEnabled;
        }
    
        public void setCompressionEnabled(boolean compressionEnabled) {
            this.compressionEnabled = compressionEnabled;
        }
    
        public String getZipkinUrl() {
            return zipkinUrl;
        }
    
        public void setZipkinUrl(String zipkinUrl) {
            this.zipkinUrl = zipkinUrl;
        }
    
        public int getServerPort() {
            return serverPort;
        }
    
        public void setServerPort(int serverPort) {
            this.serverPort = serverPort;
        }
    
        public String getApplicationName() {
            return applicationName;
        }
    
        public void setApplicationName(String applicationName) {
            this.applicationName = applicationName;
        }
    }

    ZipkinConfig.java

    package com.dubbo.trace.configuration;
    
    import com.github.kristofa.brave.Brave;
    import com.github.kristofa.brave.Brave.Builder;
    import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
    import com.github.kristofa.brave.Sampler;
    import com.github.kristofa.brave.SpanCollector;
    import com.github.kristofa.brave.http.HttpSpanCollector;
    import com.github.kristofa.brave.http.HttpSpanCollector.Config;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ZipkinConfig {
    
        @Autowired
        TraceConfig traceConfig;
    
        /**
         * 配置收集器
         *
         * @return
         */
        @Bean("httpSpanCollector")
        public HttpSpanCollector spanCollector() {
            Config config = Config.builder().compressionEnabled(false).connectTimeout(traceConfig.getConnectTimeout())
                    .flushInterval(traceConfig.getFlushInterval()).readTimeout(traceConfig.getReadTimeout()).build();
            return HttpSpanCollector.create(traceConfig.getZipkinUrl(), config, new EmptySpanCollectorMetricsHandler());
        }
    
        /**
         * Brave各工具类的封装
         *
         * @param spanCollector
         * @return
         */
        @Bean
        public Brave brave(SpanCollector spanCollector) {
            Builder builder = new Builder(traceConfig.getApplicationName());// 指定serviceName
            builder.spanCollector(spanCollector);
            builder.traceSampler(Sampler.create(1));// 采集率
            return builder.build();
        }
    
    
    }

    11、启动配置

    EnableTraceAutoConfiguration.java

    package com.dubbo.trace.configuration;
    
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    @Configuration
    @ConditionalOnBean(annotation = EnableTraceAutoConfigurationProperties.class)
    @AutoConfigureAfter(SpringBootConfiguration.class)
    @EnableConfigurationProperties(TraceConfig.class)
    @ComponentScan(basePackages = "com.dubbo.trace")
    public class EnableTraceAutoConfiguration {
    }

    EnableTraceAutoConfigurationProperties.java

    package com.dubbo.trace.configuration;
    
    import java.lang.annotation.*;
    
    /**
     * @author 王柱星
     * @version 1.0
     * @title
     * @time 2018年10月25日
     * @since 1.0
     */
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface EnableTraceAutoConfigurationProperties {
    }

    12、自定义starter配置

    resources/META-INF/spring.factories

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.dubbo.trace.configuration.EnableTraceAutoConfiguration

    13、dubbo过滤器配置

    resources/META-INF/com.alibaba.dubbo.rpc.Filter

    traceConsumerFilter=com.dubbo.trace.dubbo.TraceConsumerFilter
    traceProviderFilter=com.dubbo.trace.dubbo.TraceProviderFilter

    五、验证

    1、启动测试项目:

    Springboot-Zipkin0、Springboot-Zipkin1

    2、访问:http://192.168.1.100:8080/testDubbo

    3、打开zipkin地址:

    http://47.52.199.100:9411

    六、代码地址

    https://github.com/Star-Lordxing/trace_starter_xing

    七、扩展思路

    1、zipkin这一套目前是把数据存储在assandra内存中、对性能有较高要求可采用ES存储。zipkin本身已经提供该实现、修改启动参数即可。

    2、zipkin中目前只能看到调用链简单信息,并不知道报错频率、平均调用时间、调用次数等、需要借助其他工具完成。

    3、zipkin只能看到请求报错信息、没法看到业务报错信息,需要查看ELK。

  • 相关阅读:
    java 编译
    MapReduce
    搜索引擎中index、attribute和summary概念
    正排索引和倒排索引简单介绍
    SVN 使用
    PlantUML + Chrome 联合使用
    vim 多标签和多窗口
    关于数据库不适合docker的原因(摘抄)
    跨域
    is_valid校验机制
  • 原文地址:https://www.cnblogs.com/wangzhuxing/p/9944135.html
Copyright © 2011-2022 走看看