zoukankan      html  css  js  c++  java
  • message.go

    package nsqd

    import (
        "bytes"
        "encoding/binary"
        "fmt"
        "io"
        "time"
    )

    const (
        MsgIDLength       = 16
        minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
    )

    type MessageID [MsgIDLength]byte

    type Message struct {
        ID        MessageID
        Body      []byte
        Timestamp int64
        Attempts  uint16

        // for in-flight handling
        deliveryTS time.Time
        clientID   int64
        pri        int64
        index      int
        deferred   time.Duration
    }

    func NewMessage(id MessageID, body []byte) *Message {
        return &Message{
            ID:        id,
            Body:      body,
            Timestamp: time.Now().UnixNano(),
        }
    }

    func (m *Message) WriteTo(w io.Writer) (int64, error) {
        var buf [10]byte
        var total int64

        binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
        binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))

        n, err := w.Write(buf[:])
        total += int64(n)
        if err != nil {
            return total, err
        }

        n, err = w.Write(m.ID[:])
        total += int64(n)
        if err != nil {
            return total, err
        }

        n, err = w.Write(m.Body)
        total += int64(n)
        if err != nil {
            return total, err
        }

        return total, nil
    }

    // decodeMessage deserializes data (as []byte) and creates a new Message
    // message format:
    // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
    // |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
    // |       8-byte         ||    ||                 16-byte                      || N-byte
    // ------------------------------------------------------------------------------------------...
    //   nanosecond timestamp    ^^                   message ID                       message body
    //                        (uint16)
    //                         2-byte
    //                        attempts
    func decodeMessage(b []byte) (*Message, error) {
        var msg Message

        if len(b) < minValidMsgLength {
            return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
        }

        msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
        msg.Attempts = binary.BigEndian.Uint16(b[8:10])
        copy(msg.ID[:], b[10:10+MsgIDLength])
        msg.Body = b[10+MsgIDLength:]

        return &msg, nil
    }

    func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
        buf.Reset()
        _, err := msg.WriteTo(buf)
        if err != nil {
            return err
        }
        return bq.Put(buf.Bytes())
    }

  • 相关阅读:
    mysql 中文字段排序( 按拼音首字母排序) 的查询语句
    纯css3样式属性制作各种图形图标
    10个超有用的网页设计工具和资源
    手风琴导航效果实现
    css3动画导航实现
    java实现将资源文件转化成sql语句导入数据库
    select实现输入模糊匹配与选择双重功能
    js一些问题总结
    java实现excel与mysql的导入导出
    《C++程序设计》朝花夕拾
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457343.html
Copyright © 2011-2022 走看看