zoukankan      html  css  js  c++  java
  • 使用redis作为消息队列的用法

    背景

    最近项目有个需求需要动态更新规则,当时脑中想到的第一个方案是利用zk的监听机制,管理人员更新完规则将状态写入zk,集群中的机器监听zk的状态,当有状态变更后,集群中的机器开始拉取最新的配置。但由于公司技术选型,没有专门搭建zk集群,因此也不可能为这一个小需求去搭建zk集群。图为使用zk监听状态变化的流程。

    最后只好退而求其次,想到了使用redis的队列来做规则的更新

    消息队列

    首先做简单的引入。

    1. 队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
    2. 消息队列(来自百度百科):是在消息的传输过程中保存消息的容器。

    从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决“生产者”和“消费者”问题,在二者这间建立桥梁,it中专业术语是对“生产者”和“消费者”进行解耦。可以动态的通过调整“生产者”和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。

    队列 一般指的是单个服务实例内部使用,比如,在java中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(无界)、PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue(延迟队列 无界)、PriorityBlockingQueue(优先 无界)、SynchronousQueue(没有容量的队列)。可以看到java的api已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。

    MQ主要是用来:

    1. 解耦应用、
    2. 异步化消息
    3. 流量削峰填谷

    目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

    另外上面提到的“有界”和“无界”,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;无界 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用“无界”队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了“消息队列”。

    消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列”没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到“分布式”环境中而已。

    可以看到这里这里提到的“传统”消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。

    redis 实现消息队列

    redis有一个数据类型叫list(列表),它的每个子元素都是 string 类型的双向链表。我们可以通过 push,pop 操作从链表的头部或者尾部添加删除元素。这使得 list 既可以用作栈,也可以用作队列。

    假如,我们有一个队列系统,把一个个任务放到队列中,另一个进程就把队列中的任务取出来执行。

    放到队列我们使用LPUSH,也就是往双向链表的尾部填充一个元素,这一端也叫生产者,是产生内容的一端。

    另一个进程使用RPOP往头部取出元素来执行,这一端也叫消费者。

    如果仅仅是这种方式来实现队列,它就是需要进程不断地循环队列,判断队列是不是有新元素,有的话就取出来执行,没有的话,就继续循环,但是这个总有一个时间间隔,你总得规定每隔一段时间去循环,虽然这个时间很小,但总有延迟,这种方式叫作轮循。有没有一种方式就是让不断执行一个redis命令,而redis中的列队有值就会通过命令通知程序呢?有的,那就是阻塞操作的RPOP,它叫作BRPOP。

    我们来演示一下它是如何实现的。

    $ redis-cli

    127.0.0.1:6379> BRPOP list1 0

    先执行BRPOP,假如队列list1没有值,它会返回nil,并且阻塞在那,在等另一个程序或进程往list1中填值。

    我们开启另一个redis端终。

    $ redis-cli

    127.0.0.1:6379> LPUSH list1 a

    (integer) 1

    我们再来看之前的结果。

    127.0.0.1:6379> BRPOP list1 0

    1) "list1"

    2) "a"

    (16.99s)

    这样就能把列表的值给取到了。

    优点

    1. 能够实现持久化
    2. 采用 Master-Slave 数据复制模式。队列操作都是写操作,Master任务繁重,能让Slave分担的持久化工作,就不要Master做。RDB和AOF两种方法都用上,多重保险。
    3. 支持集群
    4. 接口使用简单

    不足

    1. Redis上消息只会被一个消费者消费,不会有多个订阅者消费同一个消息,简单一对一
    2. 生产者或者消费者崩溃后的处理机制,需要自己实现
    3. 生产者写入太快,消费者消费太慢,导致Redis的内存问题,处理机制需要自己实现

    通过pub/sub来实现

    实现机制

    订阅,取消订阅和发布实现了发布/订阅消息范式,发布者不是计划发送消息给特定的订阅者。而是发布的消息分到不同的频道,不需要知道什么样的订阅者订阅。订阅者对一个或多个频道感兴趣,只需接收感兴趣的消息,不需要知道什么样的发布者发布的。

    这是一种基于非持久化的消息机制,消息发布者和订阅者必须同时在线,否则一旦消息订阅者由于各种异常情况而被迫断开连接,在其重新连接后,其离线期间的消息是无法被重新通知的(即发即弃)。

    Redis中的消息可以提供两种不同的功能。一类是基于Channel的消息,这一类消息和Redis中存储的Keys没有太多关联,也就是说即使不在Redis中存储任何Keys信息,这类消息也可以独立使用。另一类消息可以对(也可以不对)Redis中存储的Keys信息的变化事件进行通知,可以用来向订阅者通知Redis中符合订阅条件的Keys的各种事件。

    通过springboot 构建redis消息队列

    首先springboot配置文件配置如下:

    spring.redis.host=localhost

    spring.redis.port=6379

    消息生产者,注入redisTemplate,用convertAndSend发送消息

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.data.redis.core.StringRedisTemplate;

    import org.springframework.stereotype.Service;

    @Service

    public class PublishService {

    @Autowired

    private StringRedisTemplate stringRedisTemplate;

    public void sendMsg(String channel, String msg) {

    stringRedisTemplate.convertAndSend(channel, msg);

    }

    }

    消费者:创建一个接收消息的类,继承MessageListener,也可以不继承

    import lombok.extern.slf4j.Slf4j;

    import org.springframework.stereotype.Service;

    @Slf4j

    @Service

    public class RedisReceiver {

    public void receiveMessage(String message) {

    log.info("receive message is {}",message);

    }

    }

    消息订阅者配置类:

    import com.wuzy.queue.RedisReceiver;

    import org.springframework.context.annotation.Bean;

    import org.springframework.data.redis.connection.RedisConnectionFactory;

    import org.springframework.data.redis.core.StringRedisTemplate;

    import org.springframework.data.redis.listener.PatternTopic;

    import org.springframework.data.redis.listener.RedisMessageListenerContainer;

    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

    import org.springframework.stereotype.Component;

    /**

    * redis 监听配置

    */

    @Configuration

    public class RedisSubListenerConfig {

    /**

    * 初始化监听器

    *

    * @param connectionFactory

    * @param listenerAdapter

    * @return

    */

    @Bean

    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,

    MessageListenerAdapter listenerAdapter) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();

    container.setConnectionFactory(connectionFactory);

    container.addMessageListener(listenerAdapter, new PatternTopic("channel_1")); // new PatternTopic("这里是监听的通道的名字") 通道要和发布者发布消息的通道一致

    return container;

    }

    /**

    * 绑定消息监听者和接收监听的方法

    *

    * @param redisReceiver

    * @return

    */

    @Bean

    MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {

    // redisReceiver 消息接收者

    // receiveMessage 消息接收后的方法

    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();

    messageListenerAdapter.setDefaultListenerMethod("receiveMessage");

    messageListenerAdapter.setDelegate(redisReceiver);

    return messageListenerAdapter;

    }

    @Bean

    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {

    return new StringRedisTemplate(connectionFactory);

    }

    }

    优点

    1. 一个生产者能够对应多个消费者
    2. 支持集群
    3. 接口使用简单

    不足

    1. Redis提供的订阅/发布功能并不完美,更不能和ActiveMQ/RabbitMQ提供的订阅/发布功能相提并论。
    2. 首先这些消息并没有持久化机制,属于即发即弃模式。也就是说它们不能像ActiveMQ中的消息那样保证持久化消息订阅者不会错过任何消息,无论这些消息订阅者是否随时在线。
    3. 由于本来就是即发即弃的消息模式,所以Redis也不需要专门制定消息的备份和恢复机制。
    4. 也是由于即发即弃的消息模式,所以Redis也没有必要专门对使用订阅/发布功能的客户端连接进行识别,用来明确该客户端连接的ID是否在之前已经连接过Redis服务了。ActiveMQ中保持持续通知的功能的前提,就是能够识别客户端连接ID的历史连接情况,以便确定哪些订阅消息这个客户端还没有处理。
    5. Redis也没有为发布者和订阅者准备保证消息性能的任何方案,例如在大量消息同时到达Redis服务是,如果消息订阅者来不及完成消费,就可能导致消息堆积。而ActiveMQ中有专门针对这种情况的慢消息机制。
  • 相关阅读:
    Python基础
    pip install psycopg2出现python setup.py egg_info failed with error code 1 in /tmp/pip-build-YtLeN3/psycopg2错误处理
    Python基础
    C语言基础
    benchmarks
    用 MuGo 搭建 Go Engine 在 KGS 对战
    GPU
    linux 杀掉僵尸进程 (zombie process, defunct)
    CMakeLists.txt 语法
    软件列表(按字母排序)
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/12708888.html
Copyright © 2011-2022 走看看