zoukankan      html  css  js  c++  java
  • Redis高级特性

    1、发布订阅模式

    1.1、列表实现消息队列的局限性

      通过list 的rpush 和 lpop(或 lpush和rpop)可以实现消息队列,但是会有很多局限性:

        1)消费者需要不停地调用lpop ,通信的开销很大。

        2)如果生产者生产消息的速度远大于消费者消费的速度,list会占用大量的内存。

        3)不支持一对多的消息分发(但是也可以做一个消息分发服务,分发服务获取到消息后,再转发给需要这个消息的消费者,类似其发布订阅模式)

    1.2、发布订阅模式

      除了list实现消息队列之外,redis还提供了一组命令实现发布/订阅模式,发送者和接受者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。

    *  频道的订阅与退订

      客户端可以订阅一个或多个频道(频道也可以理解成queue),redis将所有频道的订阅关系保存在服务器状态的pubsub_channels属性里,pubsub_channels是一个字典结构,这个字典的键是频道名称,键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。

                              

    1)订阅

      客户端执行subscribe命令订阅频道,服务器将客户端和频道在pubsub_channels字典中进行关联:

        ——如果该频道已经有订阅者,会将新订阅的客户端添加到订阅者链表末尾

        ——如果该频道还没有订阅者,首先在pubsub_channels字典中为频道创建一个键,并将键的值设置为一个空链表,然后将客户端添加到链表中。

    2)退订

      客户端执行unsubscribe命令退订频道,服务器将从pubsub_channels字典中解除客户端和频道关联关系:

        ——根据被退订频道的名称,在字典中找到频道对应的订阅者链表,然后从链表中删除退订客户端

        ——如果删除退订客户端后,订阅链表为空,则从字典中删除频道对应的键。

    *  模式的订阅与退订

      与频道类似,redis服务器也将所有模式的订阅关系保存在服务器状态的pubsub_patterns属性里,pubsub_patterns是一个链表结构,每个节点包含一个pubsubPattern结构,如下:

    typedef struct pubsubPattern{
        redisClient *client; /* 订阅模式的客户端 */
        robj *pattern; /* 被订阅的模式 */
    }pubsubPattern;

                                                         

    1)订阅

      客户端执行psubscribe命令订阅模式,服务器会对每个被订阅的模式执行以下操作:

         ——新建一个pubsubPattern结构,将pattern属性设置为被订阅的模式,client属性设置为订阅模式的客户端

        ——将pubsubPattern结构添加到pubsub_patterns链表的末尾。

    2)退订

      客户端执行punsubscribe命令退订模式,服务器将在pubsub_patterns链表中查找并删除响应的节点。

    *  发送消息

      客户端执行publish  <channel>  <message>命令,将message发送给频道所有的订阅者:

        1)遍历pubsub_channels字典,获取channel对应的订阅者链表,将message发送给所有订阅者

        2)遍历pubsub_patterns链表,找到并将message发送给所有与channel匹配的pattern对应的订阅者。

     *  获取(消费)消息

      只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息

      客户端执行pubsub命令后,会进入订阅状态,处于此状态下客户端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe这四个属于"发布/订阅"之外的命令,否则会报错。

    127.0.0.1:6379> subscribe channel_1
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"   /* 消息类型,表示订阅成功 */
    2) "channel_1"  /* 频道名称 */
    3) (integer) 2  /*  订阅次频道的客户端数量 */

      进入订阅状态后客户端可能收到3种类型的回复。每种类型的回复都包含3个值,第一个值是消息的类型,根据消类型的不同,第二个和第三个参数的含义可能不同。消息类型的取值可能是以下3个:

        ——subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。

        ——message。表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。

        ——unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。

      如执行完订阅后,有一个客户端发送了向频道channel_1发送了"hello":

    127.0.0.1:6379> publish channel_1 hello
    (integer) 1

      订阅该频道的客户端会收到如下消息:

    127.0.0.1:6379> subscribe channel_1
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "channel_1"
    3) (integer) 2
    1) "message"
    2) "channel_1"
    3) "hello"

    *  生产者与消费者java代码示例

      生产者:

    import redis.clients.jedis.Jedis;
    
    public class MessageProducer extends Thread {
        public static final String CHANNEL_KEY = "channel_1";
        private volatile int count;
    
        public void putMessage(String message) {
            Jedis jedis = JedisPoolUtils.getJedis();
            Long publish = jedis.publish(CHANNEL_KEY, message);//返回订阅者数量
            System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish);
            count++;
        }
    
        @Override
        public synchronized void run() {
            for (int i = 0; i < 5; i++) {
                putMessage("message" + count);
            }
        }
    
        public static void main(String[] args) {
            MessageProducer messageProducer = new MessageProducer();
            Thread t1 = new Thread(messageProducer, "thread1");
            Thread t2 = new Thread(messageProducer, "thread2");
            Thread t3 = new Thread(messageProducer, "thread3");
            Thread t4 = new Thread(messageProducer, "thread4");
            Thread t5 = new Thread(messageProducer, "thread5");
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t5.start();
        }
    }

      消费者:

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPubSub;
    
    public class MessageConsumer implements Runnable {
        public static final String CHANNEL_KEY = "channel_1";//频道
    
        public static final String EXIT_COMMAND = "exit";//结束程序的消息
    
        private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//处理接收消息
    
        public void consumerMessage() {
            Jedis jedis = JedisPoolUtils.getJedis();
            jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一个参数是处理接收消息,第二个参数是订阅的消息频道
        }
    
        @Override
        public void run() {
            while (true) {
                consumerMessage();
            }
        }
    
        public static void main(String[] args) {
            MessageConsumer messageConsumer = new MessageConsumer();
            Thread t1 = new Thread(messageConsumer, "thread5");
            Thread t2 = new Thread(messageConsumer, "thread6");
            t1.start();
            t2.start();
        }
    }
    
    /**
     * 继承JedisPubSub,重写接收消息的方法
     */
    class MyJedisPubSub extends JedisPubSub {
        @Override
        /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
         * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
         * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
         * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
         **/
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
            //接收到exit消息后退出
            if (MessageConsumer.EXIT_COMMAND.equals(message)) {
                System.exit(0);
            }
        }
    }

    2、事务

      redis通过MULTI、EXEC、WATCH、DISCARD等命令来实现事务功能。事务提供了一种将多个命令请求打包,然后一次性、按顺序地执行多个命令的机制,并且在事务执行过程中,服务器不会中断事务而去执行其他命令请求。

    2.1、事务的用法

      redis 的事务涉及到四个命令:multi(开启事务),exec(执行事务),discard(取消事务),watch(监视)。

      案例场景:tom和mic各有1000元,tom需要向mic转账100元,tom的账户余额减少100元,mic的账户余额增加100元。

    127.0.0.1:6379>set tom 1000 
    OK 
    127.0.0.1:6379>set mic 1000 
    OK 
    127.0.0.1:6379>multi 
    OK 
    127.0.0.1:6379>decrby tom 100 
    QUEUED 
    127.0.0.1:6379>incrby mic 100 
    QUEUED 
    127.0.0.1:6379>exec
    1)(integer)900 
    2)(integer)1100 
    127.0.0.1:6379>get tom "900" 
    127.0.0.1:6379>get mic "1100"

      通过multi的命令开启事务。事务不能嵌套,多个multi命令效果一样。multi执行后,客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 exec 命令被调用时, 所有队列中的命令才会被执行。通过exec的命令执行事务。如果没有执行exec,所

    有的命令都不会被执行。

      如果中途不想执行事务了,可以调用discard可以清空事务队列,放弃执行。

    *  watch命令

      它可以为 Redis 事务提供 CAS 乐观锁行为(Check and Set / Compare andSwap),也就是多个客户端更新变量的时候,会跟原值做比较,只有原值没有被其他客户端修改的情况下,才更新成新的值。可以用watch监视一个或者多个key,如果开启事务之后,有一个被监视key键在 exec 执

    行之前被修改了, 那么整个事务都会被取消(key提前过期除外)。可以用unwatch取消。

              

    2.2、事务执行发生问题的两种时机

      事务执行过程中出现的问题从时间上分为两类:一种是在执行exec之前发生错误,一种是在执行exec之后发生错误。

      1)在执行exec之前发生错误

        这类主要是编译器方面的错误,比如:入队的命令存在语法错误,包括命令不存在、命令格式不正确等。

    127.0.0.1:6379>multi 
    OK 
    127.0.0.1:6379>set jijingyi 666 
    QUEUED 
    127.0.0.1:6379>hset qingshan 2673 
    (error)ERR wrong number of arguments for 'hset' command 
    127.0.0.1:6379>exec (error)EXECABORT Transaction discarded because of previous errors.

        在这种情况下事务会被拒绝执行,也就是队列中所有的命令都不会得到执行。

      2)在执行exec之后发生错误

        这是一种运行时错误,比如:类型错误,如对String使用了Hash的命令等。

    127.0.0.1:6379>multi 
    OK 
    127.0.0.1:6379>set k1 1 
    QUEUED 
    127.0.0.1:6379>hset k1 a b 
    QUEUED 
    127.0.0.1:6379>exec 
    1)OK 
    2(error)WRONGTYPE Operation against a key holding the wrong kind of value 
    127.0.0.1:6379>get k1 
    "1"

        最后发现set k1 1的命令是成功的,也就是在这种发生了运行时异常的情况下,只有错误的命令没有被执行,但是其他命令没有受到影响。这个显然不符合对原子性的定义,也就是没办法用Redis 的事务机制来实现原子性,保证数据的一致。

    思考:为什么执行exec之后发生错误,redis不回滚?

      类似对key赋予错误的数据类型这种错误,在入队时无法检测到,redis认为这类的错误在程序开发过程中就能发现并解决掉,几乎不会出现在生产环境,所以不支持回滚,而且回滚会增加redis内部程序的复杂度,影响性能。

    3、Lua脚本

      Lua是一种c语言编写的轻量级脚本语言,跟数据库的存储过程有点类似,使用Lua脚本来执行redis命令有以下好处:

        1)一次性发送多个命令,减少网络开销。

        2)redis将整个脚本作为一个整体来执行,不会被其他请求中断,保存原子性。

        3)可以将复杂的组合命令放在文件中,实现程序之间的命令集复用。

    3.1、在redis中调用Lua脚本

      使用eval方法,语法格式:

    redis>eval lua-script key-num [key1key2key3....] [value1value2value3....]

     eval 代表执行 Lua 语言的命令。

     lua-script 代表 Lua 语言脚本内容。

     key-num 表示参数中有多少个 key,需要注意的是 Redis 中 key 是从 1 开始的,如果没有 key 的参数,那么写 0。

     [key1key2key3…]是 key 作为参数传递给 Lua 语言,也可以不填,但是需要和 key-num 的个数对应起来。

     [value1value2value3 ….]这些参数传递给 Lua 语言,它们是可填可不填的。

      示例:返回一个字符串,0个参数

    redis>eval "return 'Hello World'" 0
    redis>"Hello World"

    3.2、在Lua脚本中调用redis命令

      使用redis.call(command,key [param1,param2. . .])进行操作。语法格式:

    redis>eval "redis.call('set',KEYS[1],ARGV[1])" 1 lua-key lua-value

     command 是命令,包括 set、get、del 等。

     key 是被操作的键。

     param1,param2…代表给 key 的参数

    (1)redis中调用Lua脚本执行redis命令

    redis>eval "return redis.call('set',KEYS[1],ARGV[1])" 1 jijingyi 2673 

      以上命令等价于set jijingyi 2673,在redis-cli 中直接写Lua脚本不够方便,也不能实现编辑和复用,通常会将脚本放在文件中,然后执行脚本文件。

    (2)redis中执行Lua脚本文件,操作redis

      创建Lua脚本文件:

    cd /usr/local/soft/redis5.0.5/src 
    vim test.lua

      Lua脚本内容,先设置,再取值:

    redis.call('set','jingyi','lua666') 
    return redis.call('get','jingyi')

      在Redis客户端中调用Lua脚本:

    cd /usr/local/soft/redis5.0.5/src 
    redis-cli --eval test.lua 0

    3.3、使用案例:对IP进行限流

      需求:在X秒内只能访问Y次。

      设计思路:用key记录IP,用value记录访问次数。拿到IP以后,对IP+1。如果是第一次访问,对key设置过期时间(参数1)。否则判断次数,超过限定的次数(参数2),返回0。如果没有超过次数则返回1。超过时间,key过期之后,可以再次访问。

        KEY[1]是IP, ARGV[1]是过期时间X,ARGV[2]是限制访问的次数Y。

    --ip_limit.lua 
    --IP 限流,对某个 IP 频率进行限制 ,6 秒钟访问 10 次 
    local num=redis.call('incr',KEYS[1]) 
    if tonumber(num)==1 then 
        redis.call('expire',KEYS[1],ARGV[1]) 
        return 1 
    else if tonumber(num)>tonumber(ARGV[2])then 
        return0 
    else 
        return1 
    end

    3.4、脚本缓存

      在脚本比较长的情况下,如果每次调用脚本都需要把整个脚本传给 Redis服务端,会产生比较大的网络开销。为了解决这个问题,Redis提供了EVALSHA命令,允许开发者通过脚本内容的SHA1摘要来执行脚本。

      redis在执行script load命令时会计算脚本的SHA1摘要并记录在脚本缓存中,执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了则执行脚本,否则会返回错误:"NOSCRIPT No matching script. Please use EVAL."。

    127.0.0.1:6379>script load "return 'Hello World'" 
    "470877a599ac74fbfda41caa908de682c5fc7d4b"
    127.0.0.1:6379>evalsha "470877a599ac74fbfda41caa908de682c5fc7d4b" 0 "Hello World"

    自乘案例:写一个自乘的运算,让它乘以后面的参数。

    local curVal=redis.call("get",KEYS[1]) 
    if curVal==false then 
       curVal=0 
    else 
       curVal=tonumber(curVal) 
    end 
    curVal=curVal * tonumber(ARGV[1]) redis.call("set",KEYS[1],curVal) 
    return curVal

      script load '命令' 获取摘要:

    127.0.0.1:6379> script load 'local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal = tonumber(curVal)end;curVal=curVal*tonumber(ARGV[1]);redis.call("set",KEYS[1],curVal);returncurVal'
    "be4f93d8a5379e5e5b768a74e77c8a4eb0434441" 

      调用:

    127.0.0.1:6379>set num 2 
    OK 
    127.0.0.1:6379>evalsha be4f93d8a5379e5e5b768a74e77c8a4eb0434441 1 num 6
    (integer)12

    3.5、脚本超时

      为了防止某个脚本执行时间过长导致 redis 无法提供服务,redis 提供了lua-time-limit参数限制脚本的最长运行时间,默认为5秒钟。lua-time-limit 5000(redis.conf配置文件中)。

      当脚本运行时间超过这一限制后,redis将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。redis提供了一个script kill的命令来中止脚本的执行,但是如果当前执行的Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么

    通过script kill命令是不能终止脚本运行的,这样是为了确保脚本运行的原子性。

      遇到这种情况,只能通过shutdown nosave命令来强行终止redis。shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。

  • 相关阅读:
    基于Kibana的可视化监控报警插件sentinl入门
    es聚合学习笔记
    spark streaming基本概念一
    es实战一:基本概念
    访问者模式(Visitor Pattern)
    模板方法模式(Template Method Pattern)
    策略模式(Strategy Pattern)
    状态模式(State Pattern)
    观察者(Observer)模式
    备忘录模式(Memento Pattern)
  • 原文地址:https://www.cnblogs.com/jing-yi/p/12911483.html
Copyright © 2011-2022 走看看