zoukankan      html  css  js  c++  java
  • Golang微服务:Micro限流、熔断

    Wrapper

    Wrapper提供了一种包装机制,使得在执行某方法前先执行Wrapper,优点Filter的意思;因此可以在客户端和服务器做很多功能:熔断限流、Filter、Auth等。
    client代码如下:调用greeter.Hello时先执行logWrap.Call方法,再调用RPC请求。

    // log wrapper logs every time a request is made
    type logWrapper struct {
    	client.Client
    }
    
    func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    	fmt.Printf("[wrapper] client request service: %s method: %s
    ", req.Service(), req.Method())
    	return l.Client.Call(ctx, req, rsp)
    }
    
    // Implements client.Wrapper as logWrapper
    func logWrap(c client.Client) client.Client {
    	return &logWrapper{c}
    }
    
    func main() {
    	service := micro.NewService(
    		micro.Name("greeter.client"),
    		micro.Registry(mdns.NewRegistry()),
    		// wrap the client
    		micro.WrapClient(logWrap),
    	)
    
    	service.Init()
    
    	greeter := proto.NewGreeterService("greeter", service.Client())
    
    	rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
    	if err != nil {
    		fmt.Println(err)
    		return
    	}
    
    	fmt.Println(rsp.Greeting)
    
    }
    

    server代码如下:当RPC调用进来时先执行logWrapper,再执行Hello

    type Greeter struct{}
    
    func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
    	rsp.Greeting = "Hello " + req.Name
    	log.Println("in Hello")
    	return nil
    }
    
    // logWrapper is a handler wrapper
    func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
    	return func(ctx context.Context, req server.Request, rsp interface{}) error {
    		log.Printf("[wrapper] server request: %v", req.Method())
    		err := fn(ctx, req, rsp)
    		return err
    	}
    }
    
    func main() {
    	service := micro.NewService(
    		micro.Name("greeter"),
    		// wrap the handler
    		micro.WrapHandler(logWrapper),
    		micro.Registry(mdns.NewRegistry()),
    	)
    
    	service.Init()
    
    	proto.RegisterGreeterHandler(service.Server(), new(Greeter))
    
    	if err := service.Run(); err != nil {
    		fmt.Println(err)
    	}
    }
    

    熔断

    Micro提供了两种实现,gobreaker和hystrix,熔断是在客户端实现。先看看 hystrix:

    var (
    	// DefaultTimeout is how long to wait for command to complete, in milliseconds
    	DefaultTimeout = 1000
    	// DefaultMaxConcurrent is how many commands of the same type can run at the same time
    	DefaultMaxConcurrent = 10
    	// DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
    	DefaultVolumeThreshold = 20
    	// DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
    	DefaultSleepWindow = 5000
    	// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
    	DefaultErrorPercentThreshold = 50
    	// DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.
    	DefaultLogger = NoopLogger{}
    )
    
    type Settings struct {
    	Timeout                time.Duration
    	MaxConcurrentRequests  int
    	RequestVolumeThreshold uint64
    	SleepWindow            time.Duration
    	ErrorPercentThreshold  int
    }
    

    hystrix会根据这5个参数(超时时间、并发请求数、请求量、空歇床、错误率)来选择合适的服务进行调度,目前是使用的 hystrix提供的默认参数,不支持自定义参数,示例:

    func TestBreaker(t *testing.T) {
    	// setup
    	r := mock.NewRegistry()
    	s := selector.NewSelector(selector.Registry(r))
    
    	c := client.NewClient(
    		// set the selector
    		client.Selector(s),
    		// add the breaker wrapper
    		client.Wrap(NewClientWrapper()),
    	)
    
    	req := c.NewRequest("test.service", "Test.Method", map[string]string{
    		"foo": "bar",
    	}, client.WithContentType("application/json"))
    
    	var rsp map[string]interface{}
    
    	// Force to point of trip
    	for i := 0; i < (hystrix.DefaultVolumeThreshold * 3); i++ {
    		c.Call(context.TODO(), req, rsp)
    	}
    
    	err := c.Call(context.TODO(), req, rsp)
    	if err == nil {
    		t.Error("Expecting tripped breaker, got nil error")
    	}
    
    	if err.Error() != "hystrix: circuit open" {
    		t.Errorf("Expecting tripped breaker, got %v", err)
    	}
    }
    

    gobreaker方案与hystrix类似,可以自定义参数。

    限流

    ratelimit可以在客户端做,也可以在服务端做;micro提供了两种方案:juju/ratelimituber/ratelimit

    客户端实现:

    func TestRateClientLimit(t *testing.T) {
    	// setup
    	r := mock.NewRegistry()
    	s := selector.NewSelector(selector.Registry(r))
    
    	testRates := []int{1, 10, 20, 100}
    
    	for _, limit := range testRates {
    		b := ratelimit.NewBucketWithRate(float64(limit), int64(limit))
    
    		c := client.NewClient(
    			// set the selector
    			client.Selector(s),
    			// add the breaker wrapper
    			client.Wrap(NewClientWrapper(b, false)),//fasle=快速失败?
    		)
    
    		req := c.NewRequest(
    			"test.service",
    			"Test.Method",
    			&TestRequest{},
    			client.WithContentType("application/json"),
    		)
    		rsp := TestResponse{}
    
    		for j := 0; j < limit; j++ {
    			err := c.Call(context.TODO(), req, &rsp)
    			e := errors.Parse(err.Error())
    			if e.Code == 429 {
    				t.Errorf("Unexpected rate limit error: %v", err)
    			}
    		}
    
    		err := c.Call(context.TODO(), req, rsp)
    		e := errors.Parse(err.Error())
    		if e.Code != 429 {
    			t.Errorf("Expected rate limit error, got: %v", err)
    		}
    	}
    }
    
    • NewBucketWithRate入参为速率(QPS)和容量(CAP),比如每秒5个请求,最大保持50个活动的请求
    • NewClientWrapper第二个参数wait,指示当受到限流时是否等待,如果是false即快速失败,返回(429,too mant request)

    服务端实现(以下代码包含了客户端测试代码):

    func TestRateServerLimit(t *testing.T) {
    	// setup
    	r := mock.NewRegistry()
    	s := selector.NewSelector(selector.Registry(r))
    
    	testRates := []int{1, 10, 20}
    
    	for _, limit := range testRates {
    		b := ratelimit.NewBucketWithRate(float64(limit), int64(limit))
    		c := client.NewClient(client.Selector(s))
    
    		name := fmt.Sprintf("test.service.%d", limit)
    
    		s := server.NewServer(
    			server.Name(name),
    			// add registry
    			server.Registry(r),
    			// add the breaker wrapper
    			server.WrapHandler(NewHandlerWrapper(b, false)),
    		)
    
    		type Test struct {
    			*testHandler
    		}
    
    		s.Handle(
    			s.NewHandler(&Test{new(testHandler)}),
    		)
    
    		if err := s.Start(); err != nil {
    			t.Fatalf("Unexpected error starting server: %v", err)
    		}
    
    		if err := s.Register(); err != nil {
    			t.Fatalf("Unexpected error registering server: %v", err)
    		}
    
    		req := c.NewRequest(name, "Test.Method", &TestRequest{}, client.WithContentType("application/json"))
    		rsp := TestResponse{}
    
    		for j := 0; j < limit; j++ {
    			if err := c.Call(context.TODO(), req, &rsp); err != nil {
    				t.Fatalf("Unexpected request error: %v", err)
    			}
    		}
    
    		err := c.Call(context.TODO(), req, &rsp)
    		if err == nil {
    			t.Fatalf("Expected rate limit error, got nil: rate %d, err %v", limit, err)
    		}
    
    		e := errors.Parse(err.Error())
    		if e.Code != 429 {
    			t.Fatalf("Expected rate limit error, got %v", err)
    		}
    
    		s.Deregister()
    		s.Stop()
    
    		// artificial test delay
    		time.Sleep(time.Millisecond * 20)
    	}
    }
    
    

    client.NewClient支持多个Wrapper,将熔断限流功能都添加上

    	import "github.com/micro/go-plugins/wrapper/breaker/hystrix"
    	import "github.com/micro/go-plugins/wrapper/ratelimiter/ratelimit"
    	c := client.NewClient(
    		client.Wrap(ratelimit.NewClientWrapper(b, false)),
    		client.Wrap(hystrixNewClientWrapper()),
    	)
    
  • 相关阅读:
    Python学习-类的继承
    Python学习-else循环子句
    Python学习-类的基本知识
    Python学习-字符编码的理解
    Python小程序—修改haproxy配置文件
    Python学习-局部变量和全局变量以及递归
    蒙特卡洛积分法(三)
    蒙特卡洛积分法(二)
    蒙特卡洛积分法(一)
    由normal生成局部坐标系
  • 原文地址:https://www.cnblogs.com/cqvoip/p/9967995.html
Copyright © 2011-2022 走看看