zoukankan      html  css  js  c++  java
  • opentracting+jager分布式链路追踪探索实践

    一、Opentracing

    opentracing通过提供平台无关、厂商无关的API,使得开发人员可以方便地实现追踪系统。opentracing提供了用于运营支撑系统和针对特定平台的辅助程序库,被跟踪的服务只需要调用这套接口,就可以被任何实现这套接口的跟踪后台(比如Zipkin, Jaeger等等)支持,而作为一个跟踪后台,只要实现了个这套接口,就可以跟踪到任何调用这套接口的服务。

    二、Jaeger

    Jaeger是Uber开源基于golang的分布式跟踪系统,使用Jaeger可以非常直观的展示整个分布式系统的调用链,由此可以很好发现和解决问题。

    Jaeger的整体架构如下:

    Jaeger组件

    jaeger-agent

    jaeger-agent是一个网络守护进程,监听通过UDP发送过来的Span,它会将其批量发送给collector。按照设计,Agent要被部署到所有主机上,作为基础设施。Agent将collector和客户端之间的路由与发现机制抽象出来。

    jaeger-collector

    jaeger-collector从Jaeger Agent接收Trace,并通过一个处理管道对其进行处理。目前的管道会校验Trace、建立索引、执行转换并最终进行存储。存储是一个可插入的组件,目前支持Cassandra和elasticsearch。

    jaeger-query

    jaeger-query服务会从存储中检索Trace并通过UI界面进行展现,通过UI界面可以展现Trace的详细信息。

    三、 安装使用

    1.下载安装jaeger

    https://www.jaegertracing.io/download/

    下载并解压

    增加执行权限:

    chmod a+x jaeger-*

    2. 安装golang package

    go get github.com/opentracing/opentracing-go

    go get github.com/uber/jaeger-client-go

    3. 指定用ES存储

    export SPAN_STORAGE_TYPE=elasticsearch

    export ES_SERVER_URLS=http://10.20.xx.xx:9200

    4. 运行jarger

     cd jaeger-1.18.0-linux-amd64

    ./jaeger-all-in-one

    在浏览器输入http://10.20.xx.xx:16686/即可打开jaeger页面:

     

    使用docker运行jaeger的各组件:

    docker-compose.yaml

    version: '3'
    services:
      jaeger-agent:
        image: jaegertracing/jaeger-agent:1.18
        stdin_open: true
        tty: true
        links:
        - jaeger-collector:jaeger-collector
        ports:
        - 6831:6831/udp
        command:
        - --reporter.grpc.host-port=jaeger-collector:14250
      jaeger-collector:
        image: jaegertracing/jaeger-collector:1.18
        environment:
          SPAN_STORAGE_TYPE: elasticsearch
          ES_SERVER_URLS: http://10.20.xx.xx:9200
        stdin_open: true
        tty: true
      jaeger-query:
        image: jaegertracing/jaeger-query:1.18
        environment:
          SPAN_STORAGE_TYPE: elasticsearch
          ES_SERVER_URLS: http://10.20.xx.xx:9200
        stdin_open: true
        tty: true
        ports:
        - 16686:16686/tcp
    

    四、 在gin中使用jaeger

    以wire依赖注入的方式引入jaeger:

    package jaeger
    
    import (
    	"fmt"
    	"github.com/google/wire"
    	"github.com/opentracing/opentracing-go"
    	"github.com/pkg/errors"
    	"github.com/spf13/viper"
    	"github.com/uber/jaeger-client-go"
    	"github.com/uber/jaeger-client-go/config"
    	"go.uber.org/zap"
    	"io"
    )
    
    // ClientType 定义jaeger client 结构体
    type ClientType struct {
    	Tracer opentracing.Tracer
    	Closer io.Closer
    }
    
    // Client  jaeger连接类型
    var Client ClientType
    
    // Options jaeger option
    type Options struct {
    	Type               string  // const
    	Param              float64 // 1
    	LogSpans           bool    // true
    	LocalAgentHostPort string  // host:port
    	Service            string  // service name
    }
    
    // NewOptions for jaeger
    func NewOptions(v *viper.Viper, logger *zap.Logger) (*Options, error) {
    	var (
    		err error
    		o   = new(Options)
    	)
    	if err = v.UnmarshalKey("jaeger", o); err != nil {
    		return nil, errors.Wrap(err, "unmarshal redis option error")
    	}
    
    	logger.Info("load jaeger options success", zap.Any("jaeger options", o))
    	return o, err
    }
    
    // New returns an instance of Jaeger Tracer that samples 100% of traces and logs all spans to stdout.
    func New(o *Options) (opentracing.Tracer, error) {
    	cfg := &config.Configuration{
    		Sampler: &config.SamplerConfig{
    			Type:  o.Type,
    			Param: o.Param,
    		},
    		Reporter: &config.ReporterConfig{
    			LogSpans: o.LogSpans,
    			// 注意:填下地址不能加http://
    			LocalAgentHostPort: o.LocalAgentHostPort,
    		},
    	}
    	tracer, closer, err := cfg.New(o.service, config.Logger(jaeger.StdLogger))
    	if err != nil {
    		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v
    ", err))
    	}
    	Client.Tracer = tracer
    	Client.Closer = closer
    
    	return tracer, err
    }
    
    // ProviderSet inject jaeger settings
    var ProviderSet = wire.NewSet(New, NewOptions) 

    初始化全局jaeger客户端,调用jaeger

    // Article 发布文章
    func (pc *ArticleController) Article(c *gin.Context) {
    	var req requests.Article
    	if err := c.ShouldBind(&req); err != nil {
    		pc.logger.Error("参数错误", zap.Error(err))
    		c.JSON(http.StatusBadRequest, httputil.Error(nil, "参数校验失败"))
    		return
    	}
    	tracer := jaeger.Client.Tracer
    	opentracing.SetGlobalTracer(tracer)
    	span := tracer.StartSpan("Article")
    	defer span.Finish()
    	ctx := context.Background()
    	ctx = opentracing.ContextWithSpan(ctx, span)
    	span.SetTag("http.url", c.Request.URL.Path)
    	article, err := pc.service.Article(ctx, &req)
    	if err != nil {
    		pc.logger.Error("发表文章失败", zap.Error(err))
    		c.JSON(http.StatusInternalServerError, httputil.Error(nil, "发表主题失败"))
    		return
    	}
    	span.LogFields(
    		log.String("event", "info"),
    		log.Int("article", article.ID),
    	)
    	c.JSON(http.StatusOK, gin.H{"code": 0, "msg": "success", "data": article})
    }
    

    通过span和context传递tracer

    // Article 发表文章
    func (s *DefaultArticleService) Article(ctx context.Context, req *requests.Article) (responses.Article, error) {
    	var result responses.Article
    	cateInfo := s.Repository.GetCategory(req.CategoryID)
    	if cateInfo.ID == 0 {
    		s.logger.Error("categoryId 不存在", zap.Any("category_id", req.CategoryID))
    		return result, gorm.ErrRecordNotFound
    	}
    	span, _ := opentracing.StartSpanFromContext(ctx, "Article")
    	defer span.Finish()
    	span.LogFields(
    		log.String("event", "insert article service"),
    	)
    
    	res, err := s.Repository.Article(req)
    
    	if err != nil {
    		s.logger.Error("发表文章失败", zap.Error(err))
    		return result, errors.New("发表文章失败")
    	}
    
    	result = responses.Article{
    		ID:    res.ID,
    		Title: res.Title,
    	}
    	return result, err
    }

    jaeger调用详情:

    五、 在go micro中使用jaeger

    go micro中各rpc服务之间的调用定位异常较为困难,可以在网关处初始化jaeger,通过传递context的方式记录追踪调用的service

    具体代码实现如下:

    func initJaeger(service string) (opentracing.Tracer, io.Closer) {
    	cfg := &jaegercfg.Configuration{
    		Sampler: &jaegercfg.SamplerConfig{
    			Type:  "const",
    			Param: 1,
    		},
    		Reporter: &jaegercfg.ReporterConfig{
    			LogSpans: true,
    			// 注意:填下地址不能加http:
    			LocalAgentHostPort: "192.168.33.16:6831",
    		},
    	}
    	tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
    	if err != nil {
    		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v
    ", err))
    	}
    	return tracer, closer
    }
    
    func main() {
    	userRpcFlag := cli.StringFlag{
    		Name:  "f",
    		Value: "./config/config_api.json",
    		Usage: "please use xxx -f config_rpc.json",
    	}
    	configFile := flag.String(userRpcFlag.Name, userRpcFlag.Value, userRpcFlag.Usage)
    	flag.Parse()
    	conf := new(gateWayConfig.ApiConfig)
    
    	if err := config.LoadFile(*configFile); err != nil {
    		log.Fatal(err)
    	}
    	if err := config.Scan(conf); err != nil {
    		log.Fatal(err)
    	}
    
    	tracer, _ := initJaeger("micro-message-system.gateway")
    	opentracing.SetGlobalTracer(tracer)
    
    	engineGateWay, err := gorm.Open(conf.Engine.Name, conf.Engine.DataSource)
    	if err != nil {
    		log.Fatal(err)
    	}
    	etcdRegisty := etcdv3.NewRegistry(
    		func(options *registry.Options) {
    			options.Addrs = conf.Etcd.Address
    		});
    
    	// Create a new service. Optionally include some options here.
    	rpcService := micro.NewService(
    		micro.Name(conf.Server.Name),
    		micro.Registry(etcdRegisty),
    		micro.Transport(grpc.NewTransport()),
    		micro.WrapClient(
    			hystrix.NewClientWrapper(),
    			wrapperTrace.NewClientWrapper(tracer),
    		), // 客户端熔断、链路追踪
    		micro.WrapHandler(wrapperTrace.NewHandlerWrapper(tracer)),
    		micro.Flags(userRpcFlag),
    	)
    	rpcService.Init()
    
    	// 创建用户服务客户端 直接可以通过它调用user rpc的服务
    	userRpcModel := userProto.NewUserService(conf.UserRpcServer.ServerName, rpcService.Client())
    
    	// 创建IM服务客户端 直接可以通过它调用im prc的服务
    	imRpcModel := imProto.NewImService(conf.ImRpcServer.ServerName, rpcService.Client())
    
    	gateWayModel := models.NewGateWayModel(engineGateWay)
    
    	// 把用户服务客户端、IM服务客户端 注册到网关
    	gateLogic := logic.NewGateWayLogic(userRpcModel, gateWayModel, conf.ImRpcServer.ImServerList, imRpcModel)
    	gateWayController := controller.NewGateController(gateLogic)
    	// web.NewService会在启动web server的同时将rpc服务注册进去
    	service := web.NewService(
    		web.Name(conf.Server.Name),
    		web.Registry(etcdRegisty),
    		web.Version(conf.Version),
    		web.Flags(userRpcFlag),
    		web.Address(conf.Port),
    	)
    	router := gin.Default()
    
    	userRouterGroup := router.Group("/gateway")
    	// 中间件验证
    	userRouterGroup.Use(middleware.ValidAccessToken)
    	{
    		userRouterGroup.POST("/send", gateWayController.SendHandle)
    		userRouterGroup.POST("/address", gateWayController.GetServerAddressHandle)
    	}
    	service.Handle("/", router)
    	if err := service.Run(); err != nil {
    		log.Fatal(err)
    	}
    
    }
    

    调用效果如下:

     

  • 相关阅读:
    eclipse web项目转maven项目
    spark作业
    大数据学习——spark-steaming学习
    大数据学习——sparkSql对接hive
    大数据学习——sparkSql对接mysql
    大数据学习——sparkSql
    大数据学习——spark运营案例
    大数据学习——spark笔记
    大数据学习——sparkRDD
    python面试题
  • 原文地址:https://www.cnblogs.com/FG123/p/13508368.html
Copyright © 2011-2022 走看看