zoukankan      html  css  js  c++  java
  • lease.go

    package clientv3

    import (
        "sync"
        "time"

        "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
        pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
        "golang.org/x/net/context"
        "google.golang.org/grpc"
    )

    type (
        LeaseRevokeResponse pb.LeaseRevokeResponse
        LeaseID             int64
    )

    // LeaseGrantResponse is used to convert the protobuf grant response.
    type LeaseGrantResponse struct {
        *pb.ResponseHeader
        ID    LeaseID
        TTL   int64
        Error string
    }

    // LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
    type LeaseKeepAliveResponse struct {
        *pb.ResponseHeader
        ID  LeaseID
        TTL int64
    }

    // LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
    type LeaseTimeToLiveResponse struct {
        *pb.ResponseHeader
        ID LeaseID `json:"id"`

        // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
        TTL int64 `json:"ttl"`

        // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
        GrantedTTL int64 `json:"granted-ttl"`

        // Keys is the list of keys attached to this lease.
        Keys [][]byte `json:"keys"`
    }

    const (
        // defaultTTL is the assumed lease TTL used for the first keepalive
        // deadline before the actual TTL is known to the client.
        defaultTTL = 5 * time.Second
        // a small buffer to store unsent lease responses.
        leaseResponseChSize = 16
        // NoLease is a lease ID for the absence of a lease.
        NoLease LeaseID = 0
    )

    type Lease interface {
        // Grant creates a new lease.
        Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

        // Revoke revokes the given lease.
        Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

        // TimeToLive retrieves the lease information of the given lease ID.
        TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

        // KeepAlive keeps the given lease alive forever.
        KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

        // KeepAliveOnce renews the lease once. In most of the cases, Keepalive
        // should be used instead of KeepAliveOnce.
        KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

        // Close releases all resources Lease keeps for efficient communication
        // with the etcd server.
        Close() error
    }

    type lessor struct {
        mu sync.Mutex // guards all fields

        // donec is closed when recvKeepAliveLoop stops
        donec chan struct{}

        remote pb.LeaseClient

        stream       pb.Lease_LeaseKeepAliveClient
        streamCancel context.CancelFunc

        stopCtx    context.Context
        stopCancel context.CancelFunc

        keepAlives map[LeaseID]*keepAlive

        // firstKeepAliveTimeout is the timeout for the first keepalive request
        // before the actual TTL is known to the lease client
        firstKeepAliveTimeout time.Duration
    }

    // keepAlive multiplexes a keepalive for a lease over multiple channels
    type keepAlive struct {
        chs  []chan<- *LeaseKeepAliveResponse
        ctxs []context.Context
        // deadline is the time the keep alive channels close if no response
        deadline time.Time
        // nextKeepAlive is when to send the next keep alive message
        nextKeepAlive time.Time
        // donec is closed on lease revoke, expiration, or cancel.
        donec chan struct{}
    }

    func NewLease(c *Client) Lease {
        l := &lessor{
            donec:                 make(chan struct{}),
            keepAlives:            make(map[LeaseID]*keepAlive),
            remote:                RetryLeaseClient(c),
            firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
        }
        if l.firstKeepAliveTimeout == time.Second {
            l.firstKeepAliveTimeout = defaultTTL
        }

        l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
        go l.recvKeepAliveLoop()
        go l.deadlineLoop()
        return l
    }

    func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
        cctx, cancel := context.WithCancel(ctx)
        done := cancelWhenStop(cancel, l.stopCtx.Done())
        defer close(done)

        for {
            r := &pb.LeaseGrantRequest{TTL: ttl}
            resp, err := l.remote.LeaseGrant(cctx, r)
            if err == nil {
                gresp := &LeaseGrantResponse{
                    ResponseHeader: resp.GetHeader(),
                    ID:             LeaseID(resp.ID),
                    TTL:            resp.TTL,
                    Error:          resp.Error,
                }
                return gresp, nil
            }
            if isHaltErr(cctx, err) {
                return nil, toErr(cctx, err)
            }
            if nerr := l.newStream(); nerr != nil {
                return nil, nerr
            }
        }
    }

    func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
        cctx, cancel := context.WithCancel(ctx)
        done := cancelWhenStop(cancel, l.stopCtx.Done())
        defer close(done)

        for {
            r := &pb.LeaseRevokeRequest{ID: int64(id)}
            resp, err := l.remote.LeaseRevoke(cctx, r)

            if err == nil {
                return (*LeaseRevokeResponse)(resp), nil
            }
            if isHaltErr(ctx, err) {
                return nil, toErr(ctx, err)
            }
            if nerr := l.newStream(); nerr != nil {
                return nil, nerr
            }
        }
    }

    func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
        cctx, cancel := context.WithCancel(ctx)
        done := cancelWhenStop(cancel, l.stopCtx.Done())
        defer close(done)

        for {
            r := toLeaseTimeToLiveRequest(id, opts...)
            resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false))
            if err == nil {
                gresp := &LeaseTimeToLiveResponse{
                    ResponseHeader: resp.GetHeader(),
                    ID:             LeaseID(resp.ID),
                    TTL:            resp.TTL,
                    GrantedTTL:     resp.GrantedTTL,
                    Keys:           resp.Keys,
                }
                return gresp, nil
            }
            if isHaltErr(cctx, err) {
                return nil, toErr(cctx, err)
            }
        }
    }

    func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
        ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)

        l.mu.Lock()
        ka, ok := l.keepAlives[id]
        if !ok {
            // create fresh keep alive
            ka = &keepAlive{
                chs:           []chan<- *LeaseKeepAliveResponse{ch},
                ctxs:          []context.Context{ctx},
                deadline:      time.Now().Add(l.firstKeepAliveTimeout),
                nextKeepAlive: time.Now(),
                donec:         make(chan struct{}),
            }
            l.keepAlives[id] = ka
        } else {
            // add channel and context to existing keep alive
            ka.ctxs = append(ka.ctxs, ctx)
            ka.chs = append(ka.chs, ch)
        }
        l.mu.Unlock()

        go l.keepAliveCtxCloser(id, ctx, ka.donec)

        return ch, nil
    }

    func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
        cctx, cancel := context.WithCancel(ctx)
        done := cancelWhenStop(cancel, l.stopCtx.Done())
        defer close(done)

        for {
            resp, err := l.keepAliveOnce(cctx, id)
            if err == nil {
                if resp.TTL == 0 {
                    err = rpctypes.ErrLeaseNotFound
                }
                return resp, err
            }
            if isHaltErr(ctx, err) {
                return nil, toErr(ctx, err)
            }

            if nerr := l.newStream(); nerr != nil {
                return nil, nerr
            }
        }
    }

    func (l *lessor) Close() error {
        l.stopCancel()
        <-l.donec
        return nil
    }

    func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
        select {
        case <-donec:
            return
        case <-l.donec:
            return
        case <-ctx.Done():
        }

        l.mu.Lock()
        defer l.mu.Unlock()

        ka, ok := l.keepAlives[id]
        if !ok {
            return
        }

        // close channel and remove context if still associated with keep alive
        for i, c := range ka.ctxs {
            if c == ctx {
                close(ka.chs[i])
                ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
                ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
                break
            }
        }
        // remove if no one more listeners
        if len(ka.chs) == 0 {
            delete(l.keepAlives, id)
        }
    }

    func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
        cctx, cancel := context.WithCancel(ctx)
        defer cancel()

        stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
        if err != nil {
            return nil, toErr(ctx, err)
        }

        err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
        if err != nil {
            return nil, toErr(ctx, err)
        }

        resp, rerr := stream.Recv()
        if rerr != nil {
            return nil, toErr(ctx, rerr)
        }

        karesp := &LeaseKeepAliveResponse{
            ResponseHeader: resp.GetHeader(),
            ID:             LeaseID(resp.ID),
            TTL:            resp.TTL,
        }
        return karesp, nil
    }

    func (l *lessor) recvKeepAliveLoop() {
        defer func() {
            l.mu.Lock()
            close(l.donec)
            for _, ka := range l.keepAlives {
                ka.Close()
            }
            l.keepAlives = make(map[LeaseID]*keepAlive)
            l.mu.Unlock()
        }()

        stream, serr := l.resetRecv()
        for serr == nil {
            resp, err := stream.Recv()
            if err != nil {
                if isHaltErr(l.stopCtx, err) {
                    return
                }
                stream, serr = l.resetRecv()
                continue
            }
            l.recvKeepAlive(resp)
        }
    }

    // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
    func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
        if err := l.newStream(); err != nil {
            return nil, err
        }
        stream := l.getKeepAliveStream()
        go l.sendKeepAliveLoop(stream)
        return stream, nil
    }

    // recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
    func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
        karesp := &LeaseKeepAliveResponse{
            ResponseHeader: resp.GetHeader(),
            ID:             LeaseID(resp.ID),
            TTL:            resp.TTL,
        }

        l.mu.Lock()
        defer l.mu.Unlock()

        ka, ok := l.keepAlives[karesp.ID]
        if !ok {
            return
        }

        if karesp.TTL <= 0 {
            // lease expired; close all keep alive channels
            delete(l.keepAlives, karesp.ID)
            ka.Close()
            return
        }

        // send update to all channels
        nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
        ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
        for _, ch := range ka.chs {
            select {
            case ch <- karesp:
                ka.nextKeepAlive = nextKeepAlive
            default:
            }
        }
    }

    // deadlineLoop reaps any keep alive channels that have not received a response
    // within the lease TTL
    func (l *lessor) deadlineLoop() {
        for {
            select {
            case <-time.After(time.Second):
            case <-l.donec:
                return
            }
            now := time.Now()
            l.mu.Lock()
            for id, ka := range l.keepAlives {
                if ka.deadline.Before(now) {
                    // waited too long for response; lease may be expired
                    ka.Close()
                    delete(l.keepAlives, id)
                }
            }
            l.mu.Unlock()
        }
    }

    // sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
    func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
        for {
            select {
            case <-time.After(500 * time.Millisecond):
            case <-stream.Context().Done():
                return
            case <-l.donec:
                return
            case <-l.stopCtx.Done():
                return
            }

            var tosend []LeaseID

            now := time.Now()
            l.mu.Lock()
            for id, ka := range l.keepAlives {
                if ka.nextKeepAlive.Before(now) {
                    tosend = append(tosend, id)
                }
            }
            l.mu.Unlock()

            for _, id := range tosend {
                r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
                if err := stream.Send(r); err != nil {
                    // TODO do something with this error?
                    return
                }
            }
        }
    }

    func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
        l.mu.Lock()
        defer l.mu.Unlock()
        return l.stream
    }

    func (l *lessor) newStream() error {
        sctx, cancel := context.WithCancel(l.stopCtx)
        stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
        if err != nil {
            cancel()
            return toErr(sctx, err)
        }

        l.mu.Lock()
        defer l.mu.Unlock()
        if l.stream != nil && l.streamCancel != nil {
            l.stream.CloseSend()
            l.streamCancel()
        }

        l.streamCancel = cancel
        l.stream = stream
        return nil
    }

    func (ka *keepAlive) Close() {
        close(ka.donec)
        for _, ch := range ka.chs {
            close(ch)
        }
    }

    // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
    // should be closed when the work is finished. When done fires, cancelWhenStop will release
    // its internal resource.
    func cancelWhenStop(cancel context.CancelFunc, stopc <-chan struct{}) chan<- struct{} {
        done := make(chan struct{}, 1)

        go func() {
            select {
            case <-stopc:
            case <-done:
            }
            cancel()
        }()

        return done
    }

  • 相关阅读:
    [Aaronyang] 写给自己的WPF4.5 笔记23 [3d交互与动画 4/4]
    [Aaronyang] 写给自己的WPF4.5 笔记22 [3d交互与动画 3/4]
    [Aaronyang] 写给自己的WPF4.5 笔记21 [3d课 2/4]
    [Aaronyang] 写给自己的WPF4.5 笔记20 [3d课 1/4]
    [Aaronyang] 写给自己的WPF4.5 笔记19[Visual类图文并茂讲解]
    [Aaronyang] 写给自己的WPF4.5 笔记18[几何图形*Geometry图文并茂讲解]
    [Aaronyang紫色博客] 写给自己的WPF4.5-Blend5公开课系列 3
    [Aaronyang紫色博客] 写给自己的WPF4.5-Blend5公开课系列 2-更进一步
    [Aaronyang紫色博客] 写给自己的WPF4.5-Blend5公开课系列 1
    [Aaronyang] 写给自己的WPF4.5 笔记15[AyArc诞生-WPF版本绚丽的环状图,Ay制作,AyWindow强势预览]
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7452677.html
Copyright © 2011-2022 走看看