go-micro 支持很多通信协议:http、tcp、grpc等,支持的编码方式也很多有json、protobuf、bytes、jsonrpc等。也可以根据自己的需要实现通信协议和编码方式。go-micro 默认的通信协议是http,默认的编码方式是protobuf。
主要代码定义如下:
// Client is the interface used to make requests to services. // It supports Request/Response via Transport and Publishing via the Broker. // It also supports bidiectional streaming of requests. type Client interface { Init(...Option) error Options() Options NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string } // Message is the interface for publishing asynchronously type Message interface { Topic() string Payload() interface{} ContentType() string } // Request is the interface for a synchronous request used by Call or Stream type Request interface { Service() string Method() string ContentType() string Request() interface{} // indicates whether the request will be a streaming one rather than unary Stream() bool } // Stream is the inteface for a bidirectional synchronous stream type Stream interface { Context() context.Context Request() Request Send(interface{}) error Recv(interface{}) error Error() error Close() error } // Option used by the Client type Option func(*Options) // CallOption used by Call or Stream type CallOption func(*CallOptions) // PublishOption used by Publish type PublishOption func(*PublishOptions) // MessageOption used by NewMessage type MessageOption func(*MessageOptions) // RequestOption used by NewRequest type RequestOption func(*RequestOptions)
client的连接使用了pool,减少生成和销毁的开销,不够用的话就另外生成:
func newPool(size int, ttl time.Duration) *pool { return &pool{ size: size, ttl: int64(ttl.Seconds()), conns: make(map[string][]*poolConn), } } // NoOp the Close since we manage it func (p *poolConn) Close() error { return nil } func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) { p.Lock() conns := p.conns[addr] now := time.Now().Unix() // while we have conns check age and then return one // otherwise we'll create a new conn for len(conns) > 0 { conn := conns[len(conns)-1] conns = conns[:len(conns)-1] p.conns[addr] = conns // if conn is old kill it and move on if d := now - conn.created; d > p.ttl { conn.Client.Close() continue } // we got a good conn, lets unlock and return it p.Unlock() return conn, nil } p.Unlock() // create new conn c, err := tr.Dial(addr, opts...) if err != nil { return nil, err } return &poolConn{c, time.Now().Unix()}, nil } func (p *pool) release(addr string, conn *poolConn, err error) { // don't store the conn if it has errored if err != nil { conn.Client.Close() return } // otherwise put it back for reuse p.Lock() conns := p.conns[addr] if len(conns) >= p.size { p.Unlock() conn.Client.Close() return } p.conns[addr] = append(conns, conn) p.Unlock() }
封装request:
type rpcRequest struct { service string method string contentType string request interface{} opts RequestOptions } func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { var opts RequestOptions for _, o := range reqOpts { o(&opts) } // set the content-type specified if len(opts.ContentType) > 0 { contentType = opts.ContentType } return &rpcRequest{ service: service, method: method, request: request, contentType: contentType, opts: opts, } }
调用接口:
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { // make a copy of call opts callOpts := r.opts.CallOptions for _, opt := range opts { opt(&callOpts) } next, err := r.next(request, callOpts) if err != nil { return err } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := WithRequestTimeout(d.Sub(time.Now())) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // make copy of call method rcall := r.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { rcall = callOpts.CallWrappers[i-1](rcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, request, i) if err != nil { return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) } // set the address address := node.Address if node.Port > 0 { address = fmt.Sprintf("%s:%d", address, node.Port) } // make the call err = rcall(ctx, address, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return err } ch := make(chan error, callOpts.Retries) var gerr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, request, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr }