zoukankan      html  css  js  c++  java
  • go-micro client 客户端

      go-micro 支持很多通信协议:http、tcp、grpc等,支持的编码方式也很多有json、protobuf、bytes、jsonrpc等。也可以根据自己的需要实现通信协议和编码方式。go-micro 默认的通信协议是http,默认的编码方式是protobuf。

      主要代码定义如下:

    // Client is the interface used to make requests to services.
    // It supports Request/Response via Transport and Publishing via the Broker.
    // It also supports bidiectional streaming of requests.
    type Client interface {
    	Init(...Option) error
    	Options() Options
    	NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
    	NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
    	Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
    	Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
    	Publish(ctx context.Context, msg Message, opts ...PublishOption) error
    	String() string
    }
    
    // Message is the interface for publishing asynchronously
    type Message interface {
    	Topic() string
    	Payload() interface{}
    	ContentType() string
    }
    
    // Request is the interface for a synchronous request used by Call or Stream
    type Request interface {
    	Service() string
    	Method() string
    	ContentType() string
    	Request() interface{}
    	// indicates whether the request will be a streaming one rather than unary
    	Stream() bool
    }
    
    // Stream is the inteface for a bidirectional synchronous stream
    type Stream interface {
    	Context() context.Context
    	Request() Request
    	Send(interface{}) error
    	Recv(interface{}) error
    	Error() error
    	Close() error
    }
    
    // Option used by the Client
    type Option func(*Options)
    
    // CallOption used by Call or Stream
    type CallOption func(*CallOptions)
    
    // PublishOption used by Publish
    type PublishOption func(*PublishOptions)
    
    // MessageOption used by NewMessage
    type MessageOption func(*MessageOptions)
    
    // RequestOption used by NewRequest
    type RequestOption func(*RequestOptions)
    

      

      client的连接使用了pool,减少生成和销毁的开销,不够用的话就另外生成:

    func newPool(size int, ttl time.Duration) *pool {
    	return &pool{
    		size:  size,
    		ttl:   int64(ttl.Seconds()),
    		conns: make(map[string][]*poolConn),
    	}
    }
    
    // NoOp the Close since we manage it
    func (p *poolConn) Close() error {
    	return nil
    }
    
    func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) {
    	p.Lock()
    	conns := p.conns[addr]
    	now := time.Now().Unix()
    
    	// while we have conns check age and then return one
    	// otherwise we'll create a new conn
    	for len(conns) > 0 {
    		conn := conns[len(conns)-1]
    		conns = conns[:len(conns)-1]
    		p.conns[addr] = conns
    
    		// if conn is old kill it and move on
    		if d := now - conn.created; d > p.ttl {
    			conn.Client.Close()
    			continue
    		}
    
    		// we got a good conn, lets unlock and return it
    		p.Unlock()
    
    		return conn, nil
    	}
    
    	p.Unlock()
    
    	// create new conn
    	c, err := tr.Dial(addr, opts...)
    	if err != nil {
    		return nil, err
    	}
    	return &poolConn{c, time.Now().Unix()}, nil
    }
    
    func (p *pool) release(addr string, conn *poolConn, err error) {
    	// don't store the conn if it has errored
    	if err != nil {
    		conn.Client.Close()
    		return
    	}
    
    	// otherwise put it back for reuse
    	p.Lock()
    	conns := p.conns[addr]
    	if len(conns) >= p.size {
    		p.Unlock()
    		conn.Client.Close()
    		return
    	}
    	p.conns[addr] = append(conns, conn)
    	p.Unlock()
    }
    

      封装request:

    type rpcRequest struct {
    	service     string
    	method      string
    	contentType string
    	request     interface{}
    	opts        RequestOptions
    }
    
    func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
    	var opts RequestOptions
    
    	for _, o := range reqOpts {
    		o(&opts)
    	}
    
    	// set the content-type specified
    	if len(opts.ContentType) > 0 {
    		contentType = opts.ContentType
    	}
    
    	return &rpcRequest{
    		service:     service,
    		method:      method,
    		request:     request,
    		contentType: contentType,
    		opts:        opts,
    	}
    }
    

      

      调用接口:

    func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
    	// make a copy of call opts
    	callOpts := r.opts.CallOptions
    	for _, opt := range opts {
    		opt(&callOpts)
    	}
    
    	next, err := r.next(request, callOpts)
    	if err != nil {
    		return err
    	}
    
    	// check if we already have a deadline
    	d, ok := ctx.Deadline()
    	if !ok {
    		// no deadline so we create a new one
    		ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
    	} else {
    		// got a deadline so no need to setup context
    		// but we need to set the timeout we pass along
    		opt := WithRequestTimeout(d.Sub(time.Now()))
    		opt(&callOpts)
    	}
    
    	// should we noop right here?
    	select {
    	case <-ctx.Done():
    		return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
    	default:
    	}
    
    	// make copy of call method
    	rcall := r.call
    
    	// wrap the call in reverse
    	for i := len(callOpts.CallWrappers); i > 0; i-- {
    		rcall = callOpts.CallWrappers[i-1](rcall)
    	}
    
    	// return errors.New("go.micro.client", "request timeout", 408)
    	call := func(i int) error {
    		// call backoff first. Someone may want an initial start delay
    		t, err := callOpts.Backoff(ctx, request, i)
    		if err != nil {
    			return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
    		}
    
    		// only sleep if greater than 0
    		if t.Seconds() > 0 {
    			time.Sleep(t)
    		}
    
    		// select next node
    		node, err := next()
    		if err != nil && err == selector.ErrNotFound {
    			return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
    		} else if err != nil {
    			return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
    		}
    
    		// set the address
    		address := node.Address
    		if node.Port > 0 {
    			address = fmt.Sprintf("%s:%d", address, node.Port)
    		}
    
    		// make the call
    		err = rcall(ctx, address, request, response, callOpts)
    		r.opts.Selector.Mark(request.Service(), node, err)
    		return err
    	}
    
    	ch := make(chan error, callOpts.Retries)
    	var gerr error
    
    	for i := 0; i <= callOpts.Retries; i++ {
    		go func(i int) {
    			ch <- call(i)
    		}(i)
    
    		select {
    		case <-ctx.Done():
    			return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
    		case err := <-ch:
    			// if the call succeeded lets bail early
    			if err == nil {
    				return nil
    			}
    
    			retry, rerr := callOpts.Retry(ctx, request, i, err)
    			if rerr != nil {
    				return rerr
    			}
    
    			if !retry {
    				return err
    			}
    
    			gerr = err
    		}
    	}
    
    	return gerr
    }
    

      

    you are the best!
  • 相关阅读:
    PHP install perl module
    PHP 静态页
    PHP对类的操作
    PHP Mysql操作。
    2020.7.16
    2020.7.19
    2020.7.14
    2020.7.12
    2020.7.17
    2020.7.10
  • 原文地址:https://www.cnblogs.com/linguoguo/p/14751436.html
Copyright © 2011-2022 走看看