zoukankan      html  css  js  c++  java
  • Redis随记--StreamID生成

    Redis Stream ID

    基础知识

    在Redis Stream数据结构中,Stream ID用来表示流中的特定条目,在XADD时可以指定StreamID,也可以使用参数*来让Redis自动生成一个唯一的StreamID。

    自动生成StreamID有两个64位的整数组成,如1526919030474-55 ,第一部分位Redis实例的毫秒格式的UNIX时间,第二部分位递增的序列号(用来表示1毫秒内生成的条目)。

    StreamID保证始终递增,首先StreamID的第一部分会随着实例的时间增长而变大,其次StreamID的第二部分会保证同一秒内的消息条目按照插入顺序依次递增。当Redis主从实例因故障或其他原因发生切换后,如果当前Stream中的StreamID的第一部分大于当前实例的UNIX时间,Redis实例会继续使用当前StreamID的第一部分+递增第二部分,以保证StreamID唯一且递增。

    相关代码

    StreamID的数据结构:

    /* Stream item ID: a 128 bit number composed of a milliseconds time and
     * a sequence counter. IDs generated in the same millisecond (or in a past
     * millisecond if the clock jumped backward) will use the millisecond time
     * of the latest generated ID and an incremented sequence. */
    typedef struct streamID {
        uint64_t ms;        /* Unix time in milliseconds. */
        uint64_t seq;       /* Sequence number. */
    } streamID;
    

    向Stream中插入条目时代码:

    /* Adds a new item into the stream 's' having the specified number of
     * field-value pairs as specified in 'numfields' and stored into 'argv'.
     * Returns the new entry ID populating the 'added_id' structure.
     *
     * If 'use_id' is not NULL, the ID is not auto-generated by the function,
     * but instead the passed ID is uesd to add the new entry. In this case
     * adding the entry may fail as specified later in this comment.
     *
     * The function returns C_OK if the item was added, this is always true
     * if the ID was generated by the function. However the function may return
     * C_ERR if an ID was given via 'use_id', but adding it failed since the
     * current top ID is greater or equal. */
    int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
        
        /* Generate the new entry ID. */
        streamID id;
        if (use_id)
            id = *use_id;
        else
            streamNextID(&s->last_id,&id);
    
        /* Check that the new ID is greater than the last entry ID
         * or return an error. Automatically generated IDs might
         * overflow (and wrap-around) when incrementing the sequence 
           part. */
        if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
    

    获取下一个StreamID:

    /* Generate the next stream item ID given the previous one. If the current
     * milliseconds Unix time is greater than the previous one, just use this
     * as time part and start with sequence part of zero. Otherwise we use the
     * previous time (and never go backward) and increment the sequence. */
    void streamNextID(streamID *last_id, streamID *new_id) {
        uint64_t ms = mstime();
        if (ms > last_id->ms) {
            new_id->ms = ms;
            new_id->seq = 0;
        } else {
            *new_id = *last_id;
            streamIncrID(new_id);
        }
    }
    

    采用递增第二部分方式生成StreamID:

    /* Set 'id' to be its successor streamID */
    void streamIncrID(streamID *id) {
        if (id->seq == UINT64_MAX) {
            if (id->ms == UINT64_MAX) {
                /* Special case where 'id' is the last possible streamID... */
                id->ms = id->seq = 0;
            } else {
                id->ms++;
                id->seq = 0;
            }
        } else {
            id->seq++;
        }
    }
    
  • 相关阅读:
    【软件工程】个人项目作业
    【软件工程】个人博客作业
    【软件工程】第一次作业-热身!
    OO第四单元总结
    OO第三单元总结
    OO第二单元总结
    OO第一单元总结
    提问回顾与个人总结
    [技术博客] Django中文件的保存与访问
    OO第三单元博客作业
  • 原文地址:https://www.cnblogs.com/gaogao67/p/14413831.html
Copyright © 2011-2022 走看看