zoukankan      html  css  js  c++  java
  • 玩转redis-延时消息队列

    上一篇基于redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

    源码地址 使用demo

    产品经理经常说的一句话,我们不光要有X功能,还要Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有延时消息队列才能算是一个完整的消息队列。

    看看redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列

    redis有序集合(sorted set)

    redis有序集合,每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
    有序集合的成员是唯一的,但分数(score)却可以重复。

    简单操作

    添加数据

    127.0.0.1:6379> ZADD testSet1 5 a
    (integer) 1
    127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
    (integer) 3
    

    读取

    127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
    1) "b"
    127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
    1) "b"
    2) "a"
    

    也可以把score打出来

    127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
    1) "b"
    2) "1"
    3) "a"
    4) "5"
    

    查出所有的数据

    127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
    1) "b"
    2) "a"
    3) "d"
    4) "c"
    

    删除数据

    ZREMRANGEBYSCORE testSet1 0 2
    

    延时队列的实现思路

    总体的思路很简单,就是每一个valuescore保存的是时间,也就是说,在添加一个元素时他的score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

    还有一个问题,就是ZRANGEBYSCORElistpop不同,pop是取出元素并且会把元素在list中删除。ZRANGEBYSCORE只会取出数据不会把数据从sorted set中删除。解决方法1,利用redis事务,先ZRANGEBYSCORE取出数据,然后再用ZREMRANGEBYSCORE 把数据删除。

    具体实现-code

    添加延时消息,参数delay就是我们要延时多久:

    func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
    	if delay <= 0 {
    		return errors.New("delay need great than zero")
    	}
    	tm := time.Now().Add(delay)
    	msg := NewMessage("", body)
    	msg.DelayTime = tm.Unix()
    
    	sendData, _ := json.Marshal(msg)
    	return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
    }
    

    使用,比如我们想过1秒再处理

    producer.PublishDelayMsg(topicName, body, time.Second)
    

    读取消息并处理
    这就比较简单了,就是在一个ticker里循环读取小于或等于当前时间的数据:

    func (s *consumer) startGetDelayMessage() {
    	go func() {
    		ticker := time.NewTicker(s.options.RateLimitPeriod)
    		defer func() {
    			log.Println("stop get delay message.")
    			ticker.Stop()
    		}()
    		topicName := s.topicName + zsetSuffix
    		for {
    			currentTime := time.Now().Unix()
    			select {
    			case <-s.ctx.Done():
    				log.Printf("context Done msg: %#v 
    ", s.ctx.Err())
    				return
    			case <-ticker.C:
    				var valuesCmd *redis.ZSliceCmd
    				_, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error {
    					valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
    					pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
    					return nil
    				})
    				if err != nil {
    					log.Printf("zset pip error: %#v 
    ", err)
    					continue
    				}
    				rev := valuesCmd.Val()
    				for _, revBody := range rev {
    					msg := &Message{}
    					json.Unmarshal([]byte(revBody.Member.(string)), msg)
    					if s.handler != nil {
    						s.handler.HandleMessage(msg)
    					}
    				}
    			}
    		}
    	}()
    }
    
  • 相关阅读:
    力拓题目 5-8-575,657,707,771
    力拓题目1-4-7,217,344,557
    解码,编码,文件的基本操作
    集合类型内置方法和拷贝浅拷贝深拷贝
    列表元祖字典内置方法
    数字类型内置方法
    字符串类型内置方法
    hdu2262 高斯消元
    hdu1757 构造矩阵
    poj1222 高斯消元
  • 原文地址:https://www.cnblogs.com/li-peng/p/12697110.html
Copyright © 2011-2022 走看看