zoukankan      html  css  js  c++  java
  • 微服务常用中间件

    前置中间件

    func PrepareMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    
    		//处理traceId
    		var traceId string
    		//从ctx获取grpc的metadata
    		md, ok := metadata.FromIncomingContext(ctx)
    		if ok {
    			vals, ok := md[util.TraceID]
    			if ok && len(vals) > 0 {
    				traceId = vals[0]
    			}
    		}
    
    		if len(traceId) == 0 {
    			traceId = logs.GenTraceId()
    		}
    
    		ctx = logs.WithFieldContext(ctx) // 从ctx拿出field 没有新建一个
    		ctx = logs.WithTraceId(ctx, traceId)	// 从traceId放入ctx
    		resp, err = next(ctx, req)
    		return
    	}
    }
    

    限流中间件

    计数限流

    type CounterLimit struct {
    	counter      int64 //计数器
    	limit        int64 //指定时间窗口内允许的最大请求数
    	intervalNano int64 //指定的时间窗口
    	unixNano     int64 //unix时间戳,单位为纳秒
    }
    
    func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit {
    
    	return &CounterLimit{
    		counter:      0,
    		limit:        limit,
    		intervalNano: int64(interval),
    		unixNano:     time.Now().UnixNano(),
    	}
    }
    
    func (c *CounterLimit) Allow() bool {
    
    	now := time.Now().UnixNano()
    	if now-c.unixNano > c.intervalNano { //如果当前过了当前的时间窗口,则重新进行计数
    		atomic.StoreInt64(&c.counter, 0)
    		atomic.StoreInt64(&c.unixNano, now)
    		return true
    	}
    
    	atomic.AddInt64(&c.counter, 1)
    	return c.counter < c.limit //判断是否要进行限流
    }
    
    

    桶限流

    type BucketLimit struct {
    	rate       float64 //漏桶中水的漏出速率
    	bucketSize float64 //漏桶最多能装的水大小
    	unixNano   int64   //unix时间戳
    	curWater   float64 //当前桶里面的水
    }
    
    func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
    	return &BucketLimit{
    		bucketSize: float64(bucketSize),
    		rate:       rate,
    		unixNano:   time.Now().UnixNano(),
    		curWater:   0,
    	}
    }
    
    func (b *BucketLimit) reflesh() {
    	now := time.Now().UnixNano()
    	//时间差, 把纳秒换成秒
    	diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000
    	b.curWater = math.Max(0, b.curWater-diffSec*b.rate)
    	b.unixNano = now
    	return
    }
    
    func (b *BucketLimit) Allow() bool {
    	b.reflesh()
    	if b.curWater < b.bucketSize {
    		b.curWater = b.curWater + 1
    		return true
    	}
    
    	return false
    }
    

    令牌桶限流

    • 基于golang.org/x/time/rate
    import "golang.org/x/time/rate"
    
    var limiter = rate.NewLimiter(50,100) // 50为限速QPS 100为桶的大小
    

    中间件实现

    type Limiter interface { 
    	Allow() bool
    }
    
    func NewRateLimitMiddleware(l Limiter) Middleware {
    	return func(next MiddlewareFunc) MiddlewareFunc {
    		return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    			allow := l.Allow()
    			if !allow {
    				err = status.Error(codes.ResourceExhausted, "rate limited")
    				return
    			}
    
    			return next(ctx, req)
    		}
    	}
    }
    

    熔断器中间件

    配置

    • 基于github.com/afex/hystrix-go/hystrix
    hystrix.ConfigureCommand("服务名", hystrix.CommandConfig{
        Timeout:	1000, 				   // 超时配置,默认1000ms
        MaxConcurrentRequests:	100, 		// 并发控制,默认是10
        RequestVolumeThreshold:	500, 		// 熔断器打开之后,冷却的时间,默认是500ms
        SleepWindow:30,     			   // 一个统计窗口的请求数量,默认是20
        ErrorPercentThreshold:50, 		   // 失败百分比,默认是50%
    })
    

    触发条件

    • 一个统计窗口内,请求数量大于RequestVolumeThreshold,且失败率大于ErrorPercentThreshold, 才会触发熔断

    中间件实现

    func HystrixMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{},err error) {
    
    		rpcMeta := meta.GetRpcMeta(ctx)
    
    		hystrixErr := hystrix.Do(rpcMeta.ServiceName, func() (err error) {
    			resp, err = next(ctx, req)
    			return err
    		}, nil)
    
    		if hystrixErr != nil {
                 err = hystrixErr
    			return
    		}
    
    		return 
    	}
    }
    

    服务注册与发现中间件

    配置

    https://www.cnblogs.com/zhaohaiyu/p/13566315.html

    中间件实现

    func NewDiscoveryMiddleware(discovery registry.Registry) Middleware {
    	return func(next MiddlewareFunc) MiddlewareFunc {
    		return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    			//从ctx获取rpc的metadata
    			rpcMeta := meta.GetRpcMeta(ctx)
    			if len(rpcMeta.AllNodes) > 0 {
    				return next(ctx, req)
    			}
    
    			service, err := discovery.GetService(ctx, rpcMeta.ServiceName)
    			if err != nil {
    				logs.Error(ctx, "discovery service:%s failed, err:%v", rpcMeta.ServiceName, err)
    				return
    			}
    
    			rpcMeta.AllNodes = service.Nodes
    			resp, err = next(ctx, req)
    			return
    		}
    	}
    }
    

    短连接中间件

    配置

    type RpcMeta struct {
    	//调用方名字
    	Caller string
    	//服务提供方
    	ServiceName string
    	//调用的方法
    	Method string
    	//调用方集群
    	CallerCluster string
    	//服务提供方集群
    	ServiceCluster string
    	//TraceID
    	TraceID string
    	//环境
    	Env string
    	//调用方IDC
    	CallerIDC string
    	//服务提供方IDC
    	ServiceIDC string
    	//当前节点
    	CurNode *registry.Node
    	//历史选择节点
    	HistoryNodes []*registry.Node
    	//服务提供方的节点列表
    	AllNodes []*registry.Node
    	//当前请求使用的连接
    	Conn *grpc.ClientConn
    }
    
    type rpcMetaContextKey struct{}
    
    func GetRpcMeta(ctx context.Context) *RpcMeta {
    	meta, ok := ctx.Value(rpcMetaContextKey{}).(*RpcMeta)
    	if !ok {
    		meta = &RpcMeta{}
    	}
    
    	return meta
    }
    

    中间件实现

    func ShortConnectMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    		//从ctx获取rpc的metadata
    		rpcMeta := meta.GetRpcMeta(ctx)
    		if rpcMeta.CurNode == nil{
    			err = errno.InvalidNode
    			logs.Error(ctx, "invalid instance")
    			return
    		}
    
    		address := fmt.Sprintf("%s:%d", rpcMeta.CurNode.IP, rpcMeta.CurNode.Port)
    		conn, err := grpc.Dial(address, grpc.WithInsecure())
    		if err != nil {
    			logs.Error(ctx, "connect %s failed, err:%v", address, err)
    			return nil, errno.ConnFailed
    		}
    
    		rpcMeta.Conn = conn
    		defer conn.Close()
    		resp, err = next(ctx, req)
    		return
    	}
    }
    
    

    负载均衡中间件

    配置

    选择
    const (
    	DefaultNodeWeight = 100
    )
    
    type LoadBalanceType int
    
    const (
    	LoadBalanceTypeRandom = iota
    	LoadBalanceTypeRoundRobin 
    )
    
    type LoadBalance interface {
    	Name() string
    	Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error)
    }
    
    func GetLoadBalance(balanceType LoadBalanceType) (balancer LoadBalance) {
    
    	switch (balanceType) {
    		 case LoadBalanceTypeRandom:
    			 balancer = NewRandomBalance()
    		 case LoadBalanceTypeRoundRobin:
    			 balancer = NewRoundRobinBalance()
    		 default:
    			 balancer = NewRandomBalance()
    	}
    	return
    }
    
    
    randombalance
    type RandomBalance struct {
    	name string
    }
    
    func NewRandomBalance() LoadBalance {
    	return &RandomBalance{
    		name: "random",
    	}
    }
    
    func (r *RandomBalance) Name() string {
    	return r.name
    }
    
    func (r *RandomBalance) Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error) {
    
    	if len(nodes) == 0 {
    		err = errno.NotHaveInstance
    		return
    	}
    
    	defer func() {
    		if node != nil {
    			setSelected(ctx, node)
    		}
    	}()
    
    	var newNodes  = filterNodes(ctx, nodes)
    	if len(newNodes) == 0 {
    		err = errno.AllNodeFailed
    		return
    	}
    
    	var totalWeight int
    	for _, val := range newNodes {
    		if val.Weight == 0 {
    			val.Weight = DefaultNodeWeight
    		}
    		totalWeight += val.Weight
    	}
    
    	curWeight := rand.Intn(totalWeight)
    	curIndex := -1
    	for index, node := range nodes {
    		curWeight -= node.Weight
    		if curWeight < 0 {
    			curIndex = index
    			break
    		}
    	}
    
    	if curIndex == -1 {
    		err = errno.AllNodeFailed
    		return
    	}
    
    	node = nodes[curIndex]
    	return
    }
    
    roundrobin
    type RoundRobinBalance struct {
    	name  string
    	index int
    }
    
    func NewRoundRobinBalance() LoadBalance {
    	return &RoundRobinBalance{
    		name: "roundrobin",
    	}
    }
    
    func (r *RoundRobinBalance) Name() string {
    	return r.name
    }
    
    func (r *RoundRobinBalance) Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error) {
    
    	if len(nodes) == 0 {
    		err = errno.NotHaveInstance
    		return
    	}
    
    	defer func() {
    		if node != nil {
    			setSelected(ctx, node)
    		}
    	}()
    
    	var newNodes = filterNodes(ctx, nodes)
    	if len(newNodes) == 0 {
    		err = errno.AllNodeFailed
    		return
    	}
    
    	r.index = (r.index + 1) % len(nodes)
    	node = nodes[r.index]
    	return
    }
    

    中间件实现

    func NewLoadBalanceMiddleware(balancer loadbalance.LoadBalance) Middleware {
    	return func(next MiddlewareFunc) MiddlewareFunc {
    		return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    			//从ctx获取rpc的metadata
    			rpcMeta := meta.GetRpcMeta(ctx)
    			if len(rpcMeta.AllNodes) == 0 {
    				err = errno.NotHaveInstance
    				logs.Error(ctx, "not have instance")
    				return
    			}
    			//生成loadbalance的上下文,用来过滤已经选择的节点
    			ctx = loadbalance.WithBalanceContext(ctx)
    			for {
    				rpcMeta.CurNode, err = balancer.Select(ctx, rpcMeta.AllNodes)
    				if err != nil {
    					return
    				}
    
    				logs.Debug(ctx, "select node:%##v", rpcMeta.CurNode)
    				rpcMeta.HistoryNodes = append(rpcMeta.HistoryNodes, rpcMeta.CurNode)
    				resp, err = next(ctx, req)
    				if err != nil {
    					//连接错误的话,进行重试
    					if errno.IsConnectError(err) {
    						continue
    					}
    					return
    				}
    				break
    			}
    			return
    		}
    	}
    }
    

    监测中间件

    • 基于prometheus

    配置

    // 服务端采样打点
    type Metrics struct {
    	requestCounter *prom.CounterVec
    	codeCounter    *prom.CounterVec
    	latencySummary *prom.SummaryVec
    }
    
    //生成server metrics实例
    func NewServerMetrics() *Metrics {
    	return &Metrics{
    		requestCounter: promauto.NewCounterVec(
    			prom.CounterOpts{
    				Name: "zhy_server_request_total",
    				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
    			}, []string{"service", "method"}),
    		codeCounter: promauto.NewCounterVec(
    			prom.CounterOpts{
    				Name: "zhy_server_handled_code_total",
    				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
    			}, []string{"service", "method", "grpc_code"}),
    		latencySummary: promauto.NewSummaryVec(
    			prom.SummaryOpts{
    				Name:       "zhy_proc_cost",
    				Help:       "RPC latency distributions.",
    				Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
    			},
    			[]string{"service", "method"},
    		),
    	}
    }
    
    //生成server metrics实例
    func NewRpcMetrics() *Metrics {
    	return &Metrics{
    		requestCounter: promauto.NewCounterVec(
    			prom.CounterOpts{
    				Name: "zhy_rpc_call_total",
    				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
    			}, []string{"service", "method"}),
    		codeCounter: promauto.NewCounterVec(
    			prom.CounterOpts{
    				Name: "zhy_rpc_code_total",
    				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
    			}, []string{"service", "method", "grpc_code"}),
    		latencySummary: promauto.NewSummaryVec(
    			prom.SummaryOpts{
    				Name:       "zhy_rpc_cost",
    				Help:       "RPC latency distributions.",
    				Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
    			},
    			[]string{"service", "method"},
    		),
    	}
    }
    
    func (m *Metrics) IncrRequest(ctx context.Context, serviceName, methodName string) {
    	m.requestCounter.WithLabelValues(serviceName, methodName).Inc()
    }
    
    func (m *Metrics) IncrCode(ctx context.Context, serviceName, methodName string, err error) {
    	st, _ := status.FromError(err)
    	m.codeCounter.WithLabelValues(serviceName, methodName, st.Code().String()).Inc()
    }
    
    func (m *Metrics) Latency(ctx context.Context, serviceName, methodName string, us int64) {
    
    	m.latencySummary.WithLabelValues(serviceName, methodName).Observe(float64(us))
    }
    

    中间件实现

    var (
    	DefaultServerMetrics = prometheus.NewServerMetrics()
    	DefaultRpcMetrics    = prometheus.NewRpcMetrics()
    )
    
    func init() {
    	go func() {
    		http.Handle("/metrics", promhttp.Handler())
    		addr := fmt.Sprintf("0.0.0.0:%d", 8888)
    		http.ListenAndServe(addr, nil)
    	}()
    }
    
    // 服务端中间件
    func PrometheusServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    
    		serverMeta := meta.GetServerMeta(ctx)
    		DefaultServerMetrics.IncrRequest(ctx, serverMeta.ServiceName, serverMeta.Method)
    
    		startTime := time.Now()
    		resp, err = next(ctx, req)
    
    		DefaultServerMetrics.IncrCode(ctx, serverMeta.ServiceName, serverMeta.Method, err)
    		DefaultServerMetrics.Latency(ctx, serverMeta.ServiceName,
    			serverMeta.Method, time.Since(startTime).Nanoseconds()/1000)
    		return
    	}
    }
    
    // 客户端的中间件
    func PrometheusClientMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    
    		rpcMeta := meta.GetRpcMeta(ctx)
    		DefaultRpcMetrics.IncrRequest(ctx, rpcMeta.ServiceName, rpcMeta.Method)
    
    		startTime := time.Now()
    		resp, err = next(ctx, req)
    
    		DefaultRpcMetrics.IncrCode(ctx, rpcMeta.ServiceName, rpcMeta.Method, err)
    		DefaultRpcMetrics.Latency(ctx, rpcMeta.ServiceName,
    			rpcMeta.Method, time.Since(startTime).Nanoseconds()/1000)
    		return
    	}
    }
    

    分布式追踪中间件

    配置

    const (
    	binHdrSuffix = "-bin"
    )
    
    // metadataTextMap extends a metadata.MD to be an opentracing textmap
    type metadataTextMap metadata.MD
    
    // Set is a opentracing.TextMapReader interface that extracts values.
    func (m metadataTextMap) Set(key, val string) {
    	// gRPC allows for complex binary values to be written.
    	encodedKey, encodedVal := encodeKeyValue(key, val)
    	// The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
    	// we just override.
    	m[encodedKey] = []string{encodedVal}
    }
    
    // ForeachKey is a opentracing.TextMapReader interface that extracts values.
    func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
    	for k, vv := range m {
    		for _, v := range vv {
    			if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
    				if err = callback(decodedKey, decodedVal); err != nil {
    					return err
    				}
    			} else {
    				return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
    			}
    		}
    	}
    	return nil
    }
    
    // encodeKeyValue encodes key and value qualified for transmission via gRPC.
    // note: copy pasted from private values of grpc.metadata
    func encodeKeyValue(k, v string) (string, string) {
    	k = strings.ToLower(k)
    	if strings.HasSuffix(k, binHdrSuffix) {
    		val := base64.StdEncoding.EncodeToString([]byte(v))
    		v = string(val)
    	}
    	return k, v
    }
    
    
    func InitTrace(serviceName, reportAddr, sampleType string, rate float64) (err error) {
    
    	transport := transport.NewHTTPTransport(
    		reportAddr,
    		transport.HTTPBatchSize(16),
    	)
    
    
    	cfg := &config.Configuration{
    		Sampler: &config.SamplerConfig{
    			Type:  sampleType,
    			Param: rate,
    		},
    		Reporter: &config.ReporterConfig{
    			LogSpans: true,
    		},
    	}
    
    	r := jaeger.NewRemoteReporter(transport)
    	tracer, closer, err := cfg.New(serviceName,
    		config.Logger(jaeger.StdLogger),
    		config.Reporter(r))
    	if err != nil {
    		fmt.Printf("ERROR: cannot init Jaeger: %v
    ", err)
    		return
    	}
    
    	_ = closer
    	opentracing.SetGlobalTracer(tracer)
    	return
    }
    

    中间件实现

    func TraceServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    		//从ctx获取grpc的metadata
    		md, ok := metadata.FromIncomingContext(ctx)
    		if !ok {
    			//没有的话,新建一个
    			md = metadata.Pairs()
    		}
    
    		tracer := opentracing.GlobalTracer()
    		parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
    		if err != nil && err != opentracing.ErrSpanContextNotFound {
    			logs.Warn(ctx, "trace extract failed, parsing trace information: %v", err)
    		}
    
    		serverMeta := meta.GetServerMeta(ctx)
    		//开始追踪该方法
    		serverSpan := tracer.StartSpan(
    			serverMeta.Method,
    			ext.RPCServerOption(parentSpanContext),
    			ext.SpanKindRPCServer,
    		)
    
    		serverSpan.SetTag(util.TraceID, logs.GetTraceId(ctx))
    		ctx = opentracing.ContextWithSpan(ctx, serverSpan)
    		resp, err = next(ctx, req)
    		//记录错误
    		if err != nil {
    			ext.Error.Set(serverSpan, true)
    			serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
    		}
    
    		serverSpan.Finish()
    		return
    	}
    }
    
    func TraceClientMiddleware(next MiddlewareFunc) MiddlewareFunc {
    	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
    
    		tracer := opentracing.GlobalTracer()
    		var parentSpanCtx opentracing.SpanContext
    		if parent := opentracing.SpanFromContext(ctx); parent != nil {
    			parentSpanCtx = parent.Context()
    		}
    
    		opts := []opentracing.StartSpanOption{
    			opentracing.ChildOf(parentSpanCtx),
    			ext.SpanKindRPCClient,
    			opentracing.Tag{Key: string(ext.Component), Value: "koala_rpc"},
    			opentracing.Tag{Key: util.TraceID, Value: logs.GetTraceId(ctx)},
    		}
    
    		rpcMeta := meta.GetRpcMeta(ctx)
    		clientSpan := tracer.StartSpan(rpcMeta.ServiceName, opts...)
    
    		md, ok := metadata.FromOutgoingContext(ctx)
    		if !ok {
    			md = metadata.Pairs()
    		}
    
    		if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
    			logs.Debug(ctx, "grpc_opentracing: failed serializing trace information: %v", err)
    		}
    
    		ctx = metadata.NewOutgoingContext(ctx, md)
    		ctx = metadata.AppendToOutgoingContext(ctx, util.TraceID, logs.GetTraceId(ctx))
    		ctx = opentracing.ContextWithSpan(ctx, clientSpan)
    
    		resp, err = next(ctx, req)
    		//记录错误
    		if err != nil {
    			ext.Error.Set(clientSpan, true)
    			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
    		}
    
    		clientSpan.Finish()
    		return
    	}
    }
    
    
  • 相关阅读:
    接口测试常见bug
    软件测试面试题含答案
    每个测试都该知道的测试用例方法及细节设计
    从“如何测试一个杯子”理解功能、界面、性能、安全测试?
    小白必看:测试人有必要参考的软件测试工作规范
    经验分享:给软件测试人员15个最好的测试管理工具
    DFS路径规划
    Trian(列车调度)
    GAIA
    CSWS_E_ROB深度估计方法
  • 原文地址:https://www.cnblogs.com/zhaohaiyu/p/13639010.html
Copyright © 2011-2022 走看看