1、发布订阅模式
1.1、列表实现消息队列的局限性
通过list 的rpush 和 lpop(或 lpush和rpop)可以实现消息队列,但是会有很多局限性:
1)消费者需要不停地调用lpop ,通信的开销很大。
2)如果生产者生产消息的速度远大于消费者消费的速度,list会占用大量的内存。
3)不支持一对多的消息分发(但是也可以做一个消息分发服务,分发服务获取到消息后,再转发给需要这个消息的消费者,类似其发布订阅模式)
1.2、发布订阅模式
除了list实现消息队列之外,redis还提供了一组命令实现发布/订阅模式,发送者和接受者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。
* 频道的订阅与退订
客户端可以订阅一个或多个频道(频道也可以理解成queue),redis将所有频道的订阅关系保存在服务器状态的pubsub_channels属性里,pubsub_channels是一个字典结构,这个字典的键是频道名称,键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。
1)订阅
客户端执行subscribe命令订阅频道,服务器将客户端和频道在pubsub_channels字典中进行关联:
——如果该频道已经有订阅者,会将新订阅的客户端添加到订阅者链表末尾
——如果该频道还没有订阅者,首先在pubsub_channels字典中为频道创建一个键,并将键的值设置为一个空链表,然后将客户端添加到链表中。
2)退订
客户端执行unsubscribe命令退订频道,服务器将从pubsub_channels字典中解除客户端和频道关联关系:
——根据被退订频道的名称,在字典中找到频道对应的订阅者链表,然后从链表中删除退订客户端
——如果删除退订客户端后,订阅链表为空,则从字典中删除频道对应的键。
* 模式的订阅与退订
与频道类似,redis服务器也将所有模式的订阅关系保存在服务器状态的pubsub_patterns属性里,pubsub_patterns是一个链表结构,每个节点包含一个pubsubPattern结构,如下:
typedef struct pubsubPattern{ redisClient *client; /* 订阅模式的客户端 */ robj *pattern; /* 被订阅的模式 */ }pubsubPattern;
1)订阅
客户端执行psubscribe命令订阅模式,服务器会对每个被订阅的模式执行以下操作:
——新建一个pubsubPattern结构,将pattern属性设置为被订阅的模式,client属性设置为订阅模式的客户端
——将pubsubPattern结构添加到pubsub_patterns链表的末尾。
2)退订
客户端执行punsubscribe命令退订模式,服务器将在pubsub_patterns链表中查找并删除响应的节点。
* 发送消息
客户端执行publish <channel> <message>命令,将message发送给频道所有的订阅者:
1)遍历pubsub_channels字典,获取channel对应的订阅者链表,将message发送给所有订阅者
2)遍历pubsub_patterns链表,找到并将message发送给所有与channel匹配的pattern对应的订阅者。
* 获取(消费)消息
只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。
客户端执行pubsub命令后,会进入订阅状态,处于此状态下客户端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe这四个属于"发布/订阅"之外的命令,否则会报错。
127.0.0.1:6379> subscribe channel_1 Reading messages... (press Ctrl-C to quit) 1) "subscribe" /* 消息类型,表示订阅成功 */ 2) "channel_1" /* 频道名称 */ 3) (integer) 2 /* 订阅次频道的客户端数量 */
进入订阅状态后客户端可能收到3种类型的回复。每种类型的回复都包含3个值,第一个值是消息的类型,根据消类型的不同,第二个和第三个参数的含义可能不同。消息类型的取值可能是以下3个:
——subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。
——message。表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。
——unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。
如执行完订阅后,有一个客户端发送了向频道channel_1发送了"hello":
127.0.0.1:6379> publish channel_1 hello (integer) 1
订阅该频道的客户端会收到如下消息:
127.0.0.1:6379> subscribe channel_1 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel_1" 3) (integer) 2 1) "message" 2) "channel_1" 3) "hello"
* 生产者与消费者java代码示例
生产者:
import redis.clients.jedis.Jedis; public class MessageProducer extends Thread { public static final String CHANNEL_KEY = "channel_1"; private volatile int count; public void putMessage(String message) { Jedis jedis = JedisPoolUtils.getJedis(); Long publish = jedis.publish(CHANNEL_KEY, message);//返回订阅者数量 System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish); count++; } @Override public synchronized void run() { for (int i = 0; i < 5; i++) { putMessage("message" + count); } } public static void main(String[] args) { MessageProducer messageProducer = new MessageProducer(); Thread t1 = new Thread(messageProducer, "thread1"); Thread t2 = new Thread(messageProducer, "thread2"); Thread t3 = new Thread(messageProducer, "thread3"); Thread t4 = new Thread(messageProducer, "thread4"); Thread t5 = new Thread(messageProducer, "thread5"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); } }
消费者:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class MessageConsumer implements Runnable { public static final String CHANNEL_KEY = "channel_1";//频道 public static final String EXIT_COMMAND = "exit";//结束程序的消息 private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//处理接收消息 public void consumerMessage() { Jedis jedis = JedisPoolUtils.getJedis(); jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一个参数是处理接收消息,第二个参数是订阅的消息频道 } @Override public void run() { while (true) { consumerMessage(); } } public static void main(String[] args) { MessageConsumer messageConsumer = new MessageConsumer(); Thread t1 = new Thread(messageConsumer, "thread5"); Thread t2 = new Thread(messageConsumer, "thread6"); t1.start(); t2.start(); } } /** * 继承JedisPubSub,重写接收消息的方法 */ class MyJedisPubSub extends JedisPubSub { @Override /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法 * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[] **/ public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message); //接收到exit消息后退出 if (MessageConsumer.EXIT_COMMAND.equals(message)) { System.exit(0); } } }
2、事务
redis通过MULTI、EXEC、WATCH、DISCARD等命令来实现事务功能。事务提供了一种将多个命令请求打包,然后一次性、按顺序地执行多个命令的机制,并且在事务执行过程中,服务器不会中断事务而去执行其他命令请求。
2.1、事务的用法
redis 的事务涉及到四个命令:multi(开启事务),exec(执行事务),discard(取消事务),watch(监视)。
案例场景:tom和mic各有1000元,tom需要向mic转账100元,tom的账户余额减少100元,mic的账户余额增加100元。
127.0.0.1:6379>set tom 1000 OK 127.0.0.1:6379>set mic 1000 OK 127.0.0.1:6379>multi OK 127.0.0.1:6379>decrby tom 100 QUEUED 127.0.0.1:6379>incrby mic 100 QUEUED 127.0.0.1:6379>exec 1)(integer)900 2)(integer)1100 127.0.0.1:6379>get tom "900" 127.0.0.1:6379>get mic "1100"
通过multi的命令开启事务。事务不能嵌套,多个multi命令效果一样。multi执行后,客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 exec 命令被调用时, 所有队列中的命令才会被执行。通过exec的命令执行事务。如果没有执行exec,所
有的命令都不会被执行。
如果中途不想执行事务了,可以调用discard可以清空事务队列,放弃执行。
* watch命令
它可以为 Redis 事务提供 CAS 乐观锁行为(Check and Set / Compare andSwap),也就是多个客户端更新变量的时候,会跟原值做比较,只有原值没有被其他客户端修改的情况下,才更新成新的值。可以用watch监视一个或者多个key,如果开启事务之后,有一个被监视key键在 exec 执
行之前被修改了, 那么整个事务都会被取消(key提前过期除外)。可以用unwatch取消。
2.2、事务执行发生问题的两种时机
事务执行过程中出现的问题从时间上分为两类:一种是在执行exec之前发生错误,一种是在执行exec之后发生错误。
1)在执行exec之前发生错误
这类主要是编译器方面的错误,比如:入队的命令存在语法错误,包括命令不存在、命令格式不正确等。
127.0.0.1:6379>multi OK 127.0.0.1:6379>set jijingyi 666 QUEUED 127.0.0.1:6379>hset qingshan 2673 (error)ERR wrong number of arguments for 'hset' command
127.0.0.1:6379>exec (error)EXECABORT Transaction discarded because of previous errors.
在这种情况下事务会被拒绝执行,也就是队列中所有的命令都不会得到执行。
2)在执行exec之后发生错误
这是一种运行时错误,比如:类型错误,如对String使用了Hash的命令等。
127.0.0.1:6379>multi OK 127.0.0.1:6379>set k1 1 QUEUED 127.0.0.1:6379>hset k1 a b QUEUED 127.0.0.1:6379>exec 1)OK 2(error)WRONGTYPE Operation against a key holding the wrong kind of value 127.0.0.1:6379>get k1 "1"
最后发现set k1 1的命令是成功的,也就是在这种发生了运行时异常的情况下,只有错误的命令没有被执行,但是其他命令没有受到影响。这个显然不符合对原子性的定义,也就是没办法用Redis 的事务机制来实现原子性,保证数据的一致。
思考:为什么执行exec之后发生错误,redis不回滚?
类似对key赋予错误的数据类型这种错误,在入队时无法检测到,redis认为这类的错误在程序开发过程中就能发现并解决掉,几乎不会出现在生产环境,所以不支持回滚,而且回滚会增加redis内部程序的复杂度,影响性能。
3、Lua脚本
Lua是一种c语言编写的轻量级脚本语言,跟数据库的存储过程有点类似,使用Lua脚本来执行redis命令有以下好处:
1)一次性发送多个命令,减少网络开销。
2)redis将整个脚本作为一个整体来执行,不会被其他请求中断,保存原子性。
3)可以将复杂的组合命令放在文件中,实现程序之间的命令集复用。
3.1、在redis中调用Lua脚本
使用eval方法,语法格式:
redis>eval lua-script key-num [key1key2key3....] [value1value2value3....]
eval 代表执行 Lua 语言的命令。
lua-script 代表 Lua 语言脚本内容。
key-num 表示参数中有多少个 key,需要注意的是 Redis 中 key 是从 1 开始的,如果没有 key 的参数,那么写 0。
[key1key2key3…]是 key 作为参数传递给 Lua 语言,也可以不填,但是需要和 key-num 的个数对应起来。
[value1value2value3 ….]这些参数传递给 Lua 语言,它们是可填可不填的。
示例:返回一个字符串,0个参数
redis>eval "return 'Hello World'" 0 redis>"Hello World"
3.2、在Lua脚本中调用redis命令
使用redis.call(command,key [param1,param2. . .])进行操作。语法格式:
redis>eval "redis.call('set',KEYS[1],ARGV[1])" 1 lua-key lua-value
command 是命令,包括 set、get、del 等。
key 是被操作的键。
param1,param2…代表给 key 的参数
(1)redis中调用Lua脚本执行redis命令
redis>eval "return redis.call('set',KEYS[1],ARGV[1])" 1 jijingyi 2673
以上命令等价于set jijingyi 2673,在redis-cli 中直接写Lua脚本不够方便,也不能实现编辑和复用,通常会将脚本放在文件中,然后执行脚本文件。
(2)redis中执行Lua脚本文件,操作redis
创建Lua脚本文件:
cd /usr/local/soft/redis5.0.5/src vim test.lua
Lua脚本内容,先设置,再取值:
redis.call('set','jingyi','lua666') return redis.call('get','jingyi')
在Redis客户端中调用Lua脚本:
cd /usr/local/soft/redis5.0.5/src redis-cli --eval test.lua 0
3.3、使用案例:对IP进行限流
需求:在X秒内只能访问Y次。
设计思路:用key记录IP,用value记录访问次数。拿到IP以后,对IP+1。如果是第一次访问,对key设置过期时间(参数1)。否则判断次数,超过限定的次数(参数2),返回0。如果没有超过次数则返回1。超过时间,key过期之后,可以再次访问。
KEY[1]是IP, ARGV[1]是过期时间X,ARGV[2]是限制访问的次数Y。
--ip_limit.lua --IP 限流,对某个 IP 频率进行限制 ,6 秒钟访问 10 次 local num=redis.call('incr',KEYS[1]) if tonumber(num)==1 then redis.call('expire',KEYS[1],ARGV[1]) return 1 else if tonumber(num)>tonumber(ARGV[2])then return0 else return1 end
3.4、脚本缓存
在脚本比较长的情况下,如果每次调用脚本都需要把整个脚本传给 Redis服务端,会产生比较大的网络开销。为了解决这个问题,Redis提供了EVALSHA命令,允许开发者通过脚本内容的SHA1摘要来执行脚本。
redis在执行script load命令时会计算脚本的SHA1摘要并记录在脚本缓存中,执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了则执行脚本,否则会返回错误:"NOSCRIPT No matching script. Please use EVAL."。
127.0.0.1:6379>script load "return 'Hello World'"
"470877a599ac74fbfda41caa908de682c5fc7d4b"
127.0.0.1:6379>evalsha "470877a599ac74fbfda41caa908de682c5fc7d4b" 0 "Hello World"
自乘案例:写一个自乘的运算,让它乘以后面的参数。
local curVal=redis.call("get",KEYS[1]) if curVal==false then curVal=0 else curVal=tonumber(curVal) end curVal=curVal * tonumber(ARGV[1]) redis.call("set",KEYS[1],curVal) return curVal
script load '命令' 获取摘要:
127.0.0.1:6379> script load 'local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal = tonumber(curVal)end;curVal=curVal*tonumber(ARGV[1]);redis.call("set",KEYS[1],curVal);returncurVal'
"be4f93d8a5379e5e5b768a74e77c8a4eb0434441"
调用:
127.0.0.1:6379>set num 2 OK
127.0.0.1:6379>evalsha be4f93d8a5379e5e5b768a74e77c8a4eb0434441 1 num 6
(integer)12
3.5、脚本超时
为了防止某个脚本执行时间过长导致 redis 无法提供服务,redis 提供了lua-time-limit参数限制脚本的最长运行时间,默认为5秒钟。lua-time-limit 5000(redis.conf配置文件中)。
当脚本运行时间超过这一限制后,redis将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。redis提供了一个script kill的命令来中止脚本的执行,但是如果当前执行的Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么
通过script kill命令是不能终止脚本运行的,这样是为了确保脚本运行的原子性。
遇到这种情况,只能通过shutdown nosave命令来强行终止redis。shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。