zoukankan      html  css  js  c++  java
  • channel.go

    package nsqd

    import (
        "bytes"
        "container/heap"
        "errors"
        "math"
        "strings"
        "sync"
        "sync/atomic"
        "time"

        "github.com/nsqio/nsq/internal/pqueue"
        "github.com/nsqio/nsq/internal/quantile"
    )

    type Consumer interface {
        UnPause()
        Pause()
        Close() error
        TimedOutMessage()
        Stats() ClientStats
        Empty()
    }

    // Channel represents the concrete type for a NSQ channel (and also
    // implements the Queue interface)
    //
    // There can be multiple channels per topic, each with there own unique set
    // of subscribers (clients).
    //
    // Channels maintain all client and message metadata, orchestrating in-flight
    // messages, timeouts, requeuing, etc.
    type Channel struct {
        // 64bit atomic vars need to be first for proper alignment on 32bit platforms
        requeueCount uint64
        messageCount uint64
        timeoutCount uint64

        sync.RWMutex

        topicName string
        name      string
        ctx       *context

        backend BackendQueue

        memoryMsgChan chan *Message
        exitFlag      int32
        exitMutex     sync.RWMutex

        // state tracking
        clients        map[int64]Consumer
        paused         int32
        ephemeral      bool
        deleteCallback func(*Channel)
        deleter        sync.Once

        // Stats tracking
        e2eProcessingLatencyStream *quantile.Quantile

        // TODO: these can be DRYd up
        deferredMessages map[MessageID]*pqueue.Item
        deferredPQ       pqueue.PriorityQueue
        deferredMutex    sync.Mutex
        inFlightMessages map[MessageID]*Message
        inFlightPQ       inFlightPqueue
        inFlightMutex    sync.Mutex
    }

    // NewChannel creates a new instance of the Channel type and returns a pointer
    func NewChannel(topicName string, channelName string, ctx *context,
        deleteCallback func(*Channel)) *Channel {

        c := &Channel{
            topicName:      topicName,
            name:           channelName,
            memoryMsgChan:  make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
            clients:        make(map[int64]Consumer),
            deleteCallback: deleteCallback,
            ctx:            ctx,
        }
        if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
            c.e2eProcessingLatencyStream = quantile.New(
                ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
                ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles,
            )
        }

        c.initPQ()

        if strings.HasSuffix(channelName, "#ephemeral") {
            c.ephemeral = true
            c.backend = newDummyBackendQueue()
        } else {
            // backend names, for uniqueness, automatically include the topic...
            backendName := getBackendName(topicName, channelName)
            c.backend = newDiskQueue(backendName,
                ctx.nsqd.getOpts().DataPath,
                ctx.nsqd.getOpts().MaxBytesPerFile,
                int32(minValidMsgLength),
                int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
                ctx.nsqd.getOpts().SyncEvery,
                ctx.nsqd.getOpts().SyncTimeout,
                ctx.nsqd.getOpts().Logger)
        }

        c.ctx.nsqd.Notify(c)

        return c
    }

    func (c *Channel) initPQ() {
        pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10))

        c.inFlightMessages = make(map[MessageID]*Message)
        c.deferredMessages = make(map[MessageID]*pqueue.Item)

        c.inFlightMutex.Lock()
        c.inFlightPQ = newInFlightPqueue(pqSize)
        c.inFlightMutex.Unlock()

        c.deferredMutex.Lock()
        c.deferredPQ = pqueue.New(pqSize)
        c.deferredMutex.Unlock()
    }

    // Exiting returns a boolean indicating if this channel is closed/exiting
    func (c *Channel) Exiting() bool {
        return atomic.LoadInt32(&c.exitFlag) == 1
    }

    // Delete empties the channel and closes
    func (c *Channel) Delete() error {
        return c.exit(true)
    }

    // Close cleanly closes the Channel
    func (c *Channel) Close() error {
        return c.exit(false)
    }

    func (c *Channel) exit(deleted bool) error {
        c.exitMutex.Lock()
        defer c.exitMutex.Unlock()

        if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
            return errors.New("exiting")
        }

        if deleted {
            c.ctx.nsqd.logf("CHANNEL(%s): deleting", c.name)

            // since we are explicitly deleting a channel (not just at system exit time)
            // de-register this from the lookupd
            c.ctx.nsqd.Notify(c)
        } else {
            c.ctx.nsqd.logf("CHANNEL(%s): closing", c.name)
        }

        // this forceably closes client connections
        c.RLock()
        for _, client := range c.clients {
            client.Close()
        }
        c.RUnlock()

        if deleted {
            // empty the queue (deletes the backend files, too)
            c.Empty()
            return c.backend.Delete()
        }

        // write anything leftover to disk
        c.flush()
        return c.backend.Close()
    }

    func (c *Channel) Empty() error {
        c.Lock()
        defer c.Unlock()

        c.initPQ()
        for _, client := range c.clients {
            client.Empty()
        }

        for {
            select {
            case <-c.memoryMsgChan:
            default:
                goto finish
            }
        }

    finish:
        return c.backend.Empty()
    }

    // flush persists all the messages in internal memory buffers to the backend
    // it does not drain inflight/deferred because it is only called in Close()
    func (c *Channel) flush() error {
        var msgBuf bytes.Buffer

        if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
            c.ctx.nsqd.logf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
                c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
        }

        for {
            select {
            case msg := <-c.memoryMsgChan:
                err := writeMessageToBackend(&msgBuf, msg, c.backend)
                if err != nil {
                    c.ctx.nsqd.logf("ERROR: failed to write message to backend - %s", err)
                }
            default:
                goto finish
            }
        }

    finish:
        for _, msg := range c.inFlightMessages {
            err := writeMessageToBackend(&msgBuf, msg, c.backend)
            if err != nil {
                c.ctx.nsqd.logf("ERROR: failed to write message to backend - %s", err)
            }
        }

        for _, item := range c.deferredMessages {
            msg := item.Value.(*Message)
            err := writeMessageToBackend(&msgBuf, msg, c.backend)
            if err != nil {
                c.ctx.nsqd.logf("ERROR: failed to write message to backend - %s", err)
            }
        }

        return nil
    }

    func (c *Channel) Depth() int64 {
        return int64(len(c.memoryMsgChan)) + c.backend.Depth()
    }

    func (c *Channel) Pause() error {
        return c.doPause(true)
    }

    func (c *Channel) UnPause() error {
        return c.doPause(false)
    }

    func (c *Channel) doPause(pause bool) error {
        if pause {
            atomic.StoreInt32(&c.paused, 1)
        } else {
            atomic.StoreInt32(&c.paused, 0)
        }

        c.RLock()
        for _, client := range c.clients {
            if pause {
                client.Pause()
            } else {
                client.UnPause()
            }
        }
        c.RUnlock()
        return nil
    }

    func (c *Channel) IsPaused() bool {
        return atomic.LoadInt32(&c.paused) == 1
    }

    // PutMessage writes a Message to the queue
    func (c *Channel) PutMessage(m *Message) error {
        c.RLock()
        defer c.RUnlock()
        if atomic.LoadInt32(&c.exitFlag) == 1 {
            return errors.New("exiting")
        }
        err := c.put(m)
        if err != nil {
            return err
        }
        atomic.AddUint64(&c.messageCount, 1)
        return nil
    }

    func (c *Channel) put(m *Message) error {
        select {
        case c.memoryMsgChan <- m:
        default:
            b := bufferPoolGet()
            err := writeMessageToBackend(b, m, c.backend)
            bufferPoolPut(b)
            c.ctx.nsqd.SetHealth(err)
            if err != nil {
                c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
                    c.name, err)
                return err
            }
        }
        return nil
    }

    // TouchMessage resets the timeout for an in-flight message
    func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error {
        msg, err := c.popInFlightMessage(clientID, id)
        if err != nil {
            return err
        }
        c.removeFromInFlightPQ(msg)

        newTimeout := time.Now().Add(clientMsgTimeout)
        if newTimeout.Sub(msg.deliveryTS) >=
            c.ctx.nsqd.getOpts().MaxMsgTimeout {
            // we would have gone over, set to the max
            newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeout)
        }

        msg.pri = newTimeout.UnixNano()
        err = c.pushInFlightMessage(msg)
        if err != nil {
            return err
        }
        c.addToInFlightPQ(msg)
        return nil
    }

    // FinishMessage successfully discards an in-flight message
    func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
        msg, err := c.popInFlightMessage(clientID, id)
        if err != nil {
            return err
        }
        c.removeFromInFlightPQ(msg)
        if c.e2eProcessingLatencyStream != nil {
            c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
        }
        return nil
    }

    // RequeueMessage requeues a message based on `time.Duration`, ie:
    //
    // `timeoutMs` == 0 - requeue a message immediately
    // `timeoutMs`  > 0 - asynchronously wait for the specified timeout
    //     and requeue a message (aka "deferred requeue")
    //
    func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
        // remove from inflight first
        msg, err := c.popInFlightMessage(clientID, id)
        if err != nil {
            return err
        }
        c.removeFromInFlightPQ(msg)

        if timeout == 0 {
            c.exitMutex.RLock()
            err := c.doRequeue(msg)
            c.exitMutex.RUnlock()
            return err
        }

        // deferred requeue
        return c.StartDeferredTimeout(msg, timeout)
    }

    // AddClient adds a client to the Channel's client list
    func (c *Channel) AddClient(clientID int64, client Consumer) {
        c.Lock()
        defer c.Unlock()

        _, ok := c.clients[clientID]
        if ok {
            return
        }
        c.clients[clientID] = client
    }

    // RemoveClient removes a client from the Channel's client list
    func (c *Channel) RemoveClient(clientID int64) {
        c.Lock()
        defer c.Unlock()

        _, ok := c.clients[clientID]
        if !ok {
            return
        }
        delete(c.clients, clientID)

        if len(c.clients) == 0 && c.ephemeral == true {
            go c.deleter.Do(func() { c.deleteCallback(c) })
        }
    }

    func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
        now := time.Now()
        msg.clientID = clientID
        msg.deliveryTS = now
        msg.pri = now.Add(timeout).UnixNano()
        err := c.pushInFlightMessage(msg)
        if err != nil {
            return err
        }
        c.addToInFlightPQ(msg)
        return nil
    }

    func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
        absTs := time.Now().Add(timeout).UnixNano()
        item := &pqueue.Item{Value: msg, Priority: absTs}
        err := c.pushDeferredMessage(item)
        if err != nil {
            return err
        }
        c.addToDeferredPQ(item)
        return nil
    }

    // doRequeue performs the low level operations to requeue a message
    //
    // Callers of this method need to ensure that a simultaneous exit will not occur
    func (c *Channel) doRequeue(m *Message) error {
        err := c.put(m)
        if err != nil {
            return err
        }
        atomic.AddUint64(&c.requeueCount, 1)
        return nil
    }

    // pushInFlightMessage atomically adds a message to the in-flight dictionary
    func (c *Channel) pushInFlightMessage(msg *Message) error {
        c.inFlightMutex.Lock()
        _, ok := c.inFlightMessages[msg.ID]
        if ok {
            c.inFlightMutex.Unlock()
            return errors.New("ID already in flight")
        }
        c.inFlightMessages[msg.ID] = msg
        c.inFlightMutex.Unlock()
        return nil
    }

    // popInFlightMessage atomically removes a message from the in-flight dictionary
    func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) {
        c.inFlightMutex.Lock()
        msg, ok := c.inFlightMessages[id]
        if !ok {
            c.inFlightMutex.Unlock()
            return nil, errors.New("ID not in flight")
        }
        if msg.clientID != clientID {
            c.inFlightMutex.Unlock()
            return nil, errors.New("client does not own message")
        }
        delete(c.inFlightMessages, id)
        c.inFlightMutex.Unlock()
        return msg, nil
    }

    func (c *Channel) addToInFlightPQ(msg *Message) {
        c.inFlightMutex.Lock()
        c.inFlightPQ.Push(msg)
        c.inFlightMutex.Unlock()
    }

    func (c *Channel) removeFromInFlightPQ(msg *Message) {
        c.inFlightMutex.Lock()
        if msg.index == -1 {
            // this item has already been popped off the pqueue
            c.inFlightMutex.Unlock()
            return
        }
        c.inFlightPQ.Remove(msg.index)
        c.inFlightMutex.Unlock()
    }

    func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
        c.deferredMutex.Lock()
        // TODO: these map lookups are costly
        id := item.Value.(*Message).ID
        _, ok := c.deferredMessages[id]
        if ok {
            c.deferredMutex.Unlock()
            return errors.New("ID already deferred")
        }
        c.deferredMessages[id] = item
        c.deferredMutex.Unlock()
        return nil
    }

    func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error) {
        c.deferredMutex.Lock()
        // TODO: these map lookups are costly
        item, ok := c.deferredMessages[id]
        if !ok {
            c.deferredMutex.Unlock()
            return nil, errors.New("ID not deferred")
        }
        delete(c.deferredMessages, id)
        c.deferredMutex.Unlock()
        return item, nil
    }

    func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
        c.deferredMutex.Lock()
        heap.Push(&c.deferredPQ, item)
        c.deferredMutex.Unlock()
    }

    func (c *Channel) processDeferredQueue(t int64) bool {
        c.exitMutex.RLock()
        defer c.exitMutex.RUnlock()

        if c.Exiting() {
            return false
        }

        dirty := false
        for {
            c.deferredMutex.Lock()
            item, _ := c.deferredPQ.PeekAndShift(t)
            c.deferredMutex.Unlock()

            if item == nil {
                goto exit
            }
            dirty = true

            msg := item.Value.(*Message)
            _, err := c.popDeferredMessage(msg.ID)
            if err != nil {
                goto exit
            }
            c.doRequeue(msg)
        }

    exit:
        return dirty
    }

    func (c *Channel) processInFlightQueue(t int64) bool {
        c.exitMutex.RLock()
        defer c.exitMutex.RUnlock()

        if c.Exiting() {
            return false
        }

        dirty := false
        for {
            c.inFlightMutex.Lock()
            msg, _ := c.inFlightPQ.PeekAndShift(t)
            c.inFlightMutex.Unlock()

            if msg == nil {
                goto exit
            }
            dirty = true

            _, err := c.popInFlightMessage(msg.clientID, msg.ID)
            if err != nil {
                goto exit
            }
            atomic.AddUint64(&c.timeoutCount, 1)
            c.RLock()
            client, ok := c.clients[msg.clientID]
            c.RUnlock()
            if ok {
                client.TimedOutMessage()
            }
            c.doRequeue(msg)
        }

    exit:
        return dirty
    }

  • 相关阅读:
    evernote100个做笔记的好方法
    平衡二叉树的调整模版
    晨间日记的奇迹
    hdu 2952 Counting Sheep
    hdu 1535 Invitation Cards
    poj 3259 Wormholes(spfa)
    poj 2263 Heavy Cargo(floyd)
    poj 3268 Silver Cow Party(SPFA)
    hdu 1690 Bus System
    hdu 3631 Shortest Path(Floyd)
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457257.html
Copyright © 2011-2022 走看看