zoukankan      html  css  js  c++  java
  • 玩转redis-简单消息队列

    使用go语言基于redis写了一个简单的消息队列
    源码地址
    使用demo

    redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

    添加数据和获取数据的操作也是非常简单的
    LPUSH 从左边插入数据
    RPUSH 大右边插入数据
    LPOP 从左边取出一个数据
    RPOP 从右边取出一个数据

    127.0.0.1:6379> LPUSH list1 a
    (integer) 1
    127.0.0.1:6379> RPUSH list1 b
    (integer) 2
    127.0.0.1:6379> LPOP list1
    "a"
    127.0.0.1:6379> RPOP list1
    "b"
    

    或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
    如果这期间有数据写入,则会读取并返回,没有数据则会返回空
    在一个窗口1读取

    127.0.0.1:6379> BLPOP list1 10
    1) "list1"
    2) "a"
    

    在另一个窗口2写入

    127.0.0.1:6379> RPUSH list1 a b c
    (integer) 3
    

    再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

    127.0.0.1:6379> BRPOP list1 1
    1) "list1"
    2) "c"
    
    127.0.0.1:6379> BRPOP list1 1
    (nil)
    (1.04s)
    

    简单消息队列的实现

    如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

    当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

    下面我说一下使用list实现一个简单的消息队列的整体思路

    comsumer的实现

    consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
    这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

    type consumer struct {
    	once            sync.Once
    	redisCmd        redis.Cmdable
    	ctx             context.Context
    	topicName       string
    	handler         Handler
    	rateLimitPeriod time.Duration
    	options         ConsumerOptions
    	_               struct{}
    }
    
    type ConsumerOptions struct {
    	RateLimitPeriod time.Duration
    	UseBLPop        bool
    }
    
    

    看一下创建consumer的代码,最后面的opts参数是可选的配置

    type Consumer = *consumer
    
    func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    	consumer := &consumer{
    		redisCmd:  redisCmd,
    		ctx:       ctx,
    		topicName: topicName,
    	}
    	for _, o := range opts {
    		o(&consumer.options)
    	}
    	if consumer.options.RateLimitPeriod == 0 {
    		consumer.options.RateLimitPeriod = time.Microsecond * 200
    	}
    	return consumer
    }
    
    

    读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
    有一个小的interface调用者根据自己的逻辑去实现

    type Handler interface {
    	HandleMessage(msg *Message)
    }
    

    读取数据的逻辑使用一个gorouting实现

    func (s *consumer) startGetMessage() {
    	go func() {
    		ticker := time.NewTicker(s.options.RateLimitPeriod)
    		defer func() {
    			log.Println("stop get message.")
    			ticker.Stop()
    		}()
    		for {
    			select {
    			case <-s.ctx.Done():
    				log.Printf("context Done msg: %#v 
    ", s.ctx.Err())
    				return
    			case <-ticker.C:
    				var revBody []byte
    				var err error
    				if !s.options.UseBLPop {
    					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
    				} else {
    					revs := s.redisCmd.BLPop(time.Second, s.topicName)
    					err = revs.Err()
    					revValues := revs.Val()
    					if len(revValues) >= 2 {
    						revBody = []byte(revValues[1])
    					}
    				}
    				if err == redis.Nil {
    					continue
    				}
    				if err != nil {
    					log.Printf("LPOP error: %#v 
    ", err)
    					continue
    				}
    
    				if len(revBody) == 0 {
    					continue
    				}
    				msg := &Message{}
    				json.Unmarshal(revBody, msg)
    				if s.handler != nil {
    					s.handler.HandleMessage(msg)
    				}
    			}
    		}
    	}()
    }
    
    

    Producer 的实现

    Producer还是很简单的就是把数据推送到 reids

    type Producer struct {
    	redisCmd redis.Cmdable
    	_        struct{}
    }
    
    func NewProducer(cmd redis.Cmdable) *Producer {
    	return &Producer{redisCmd: cmd}
    }
    
    func (p *Producer) Publish(topicName string, body []byte) error {
    	msg := NewMessage("", body)
    	sendData, _ := json.Marshal(msg)
    	return p.redisCmd.RPush(topicName, string(sendData)).Err()
    }
    
  • 相关阅读:
    Business talking in English
    My strength (C-A-R)
    牛排有几分熟怎么说
    深入理解 Java try-with-resource 语法糖
    什么时候用异常,什么时候用断言?
    Java陷阱之assert关键字
    Java断言绝对不是鸡肋
    Java 条件编译
    Java 语法糖详解
    Java中有哪些语法糖?
  • 原文地址:https://www.cnblogs.com/li-peng/p/12659222.html
Copyright © 2011-2022 走看看