第11章
1.rabbitMQ 2. redis
一.rabbitMQ:
人们写了有好多好多的开源的MQ服务器。其中大多数都是写出来用来解决特定问题的。它们不关心上面跑的是什么类型的消息,设计思想通常是和创建者息息相关的(消息的持久化,崩溃恢复等通常不在他们考虑范围内)。有三个专门设计用来做及其灵活的消息队列的程序值得关注:
· Apache ActiveMQ
· ZeroMQ
· RabbitMQ
Apache ActiveMQ 曝光率最高,不过看起来它有些问题,可能会造成丢消息。不可接受,下一个。
ZeroMQ 和 RabbitMQ 都支持一个开源的消息协议,成为AMQP。AMQP的一个优点是它是一个灵活和开放的协议,以便和另外两个商业化的Message Queue (IBM和Tibco)竞争,很好。不过ZeroMQ不支持消息持久化和崩溃恢复,不太好。剩下的只有RabbitMQ了。如果你不在意消息持久化和崩溃恢复,试试ZeroMQ吧,延迟很低,而且支持灵活的拓扑。
rabbitMQ是用Erlang写的。Erlang 是爱立信开发的高度并行的语言,用来跑在电话交换机上。在Erlang当中,充斥着大量轻量进程,它们之间用消息传递来通信。听起来思路和我们用消息队列的思路是一样的
而且,RabbitMQ支持持久化。是的,如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会回来。而且,正如在DigiTar(注:原文作者的公司)做事情期望的那样,它可以和Python无缝结合
AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。
AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)
RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制
队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器
队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具
交换机可以理解成具有路由表的路由程序
每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。
交换机当中有一系列的绑定(binding),即路由规则(routes)
你的消费者程序要负责创建你的交换机们(复数)
你已经创建了一个交换机。但是他并不知道要把消息送到哪个队列。你需要路由规则,即绑定(binding)
交换机有多种类型。他们都是做路由的,不过接受不同类型的绑定
所以要持久化消息的步骤如下:
1. 将交换机设成 durable。
2. 将队列设成 durable。
3. 将消息的 Delivery Mode 设置成2 。
以上摘自:http://blog.csdn.net/linvo/article/details/5750987
代码实现:
实现最简单的队列通信
send端
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #声明queue channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive端
#_*_coding:utf-8_*_ import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
work Queues
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
消息提供者代码

1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 #声明queue 8 channel.queue_declare(queue='task_queue') 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 import sys 12 13 message = ' '.join(sys.argv[1:]) or "Hello World!" 14 channel.basic_publish(exchange='', 15 routing_key='task_queue', 16 body=message, 17 properties=pika.BasicProperties( 18 delivery_mode = 2, # make message persistent 19 )) 20 print(" [x] Sent %r" % message) 21 connection.close()
消费者代码

1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(body.count(b'.')) 12 print(" [x] Done") 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 16 channel.basic_consume(callback, 17 queue='task_queue', 18 ) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上
做一个任务可以花几秒。你可能会想知道,如果一个消费者开始一个长的任务和模具,它只部分完成,会发生什么事。我们目前的代码一旦RabbitMQ提供消息给客户立即从内存中移除。在这种情况下,如果你杀死一个工人,我们将失去的消息,这只是处理。我们也将失去所有被发送给这个特别的工人,但尚未处理的消息。
但我们不想失去任何任务。如果一个工人死了,我们希望将这项任务交给另一个工人。
为了确保消息不会丢失,RabbitMQ支持短信致谢。ACK(nowledgement)是从消费者发送回告诉RabbitMQ,特定的邮件已收到,处理和RabbitMQ是免费删除它。
如果一个消费者死亡(其通道关闭,连接被关闭,或TCP连接丢失)不发送ACK,RabbitMQ将会理解这个消息并没有完全处理,将它重新排队。如果有其他用户同时在线,它就会快速地传递到另一个消费者。这样,你可以肯定,没有消息丢失,即使工人偶尔死。
没有任何消息超时;RabbitMQ将会发送消息时,消费者死亡。这是很好的,即使处理一个消息需要一个非常,非常长的时间。
Message acknowledgments默认是打开的。在以前的例子中,我们明确地把他们从no_ack =真正的旗帜。现在是时候删除这个标志,并发送一个正确的确认,从工人,一旦我们完成了一项任务。
1 def callback(ch, method, properties, body): 2 print " [x] Received %r" % (body,) 3 time.sleep( body.count('.') ) 4 print " [x] Done" 5 ch.basic_ack(delivery_tag = method.delivery_tag) 6 7 channel.basic_consume(callback, 8 queue='hello')
使用此代码,我们可以肯定的是,即使你杀了一个工人使用Ctrl + C则是处理一个消息,什么也不会失去。工人死都不被认可的消息后很快会被退回
消息持久化
我们已经学会了如何确保消费者即使死了,任务不会丢失(默认情况下,如果想禁用使用no_ack = true)。但是我们的任务仍然是如果RabbitMQ服务器停止了。
当RabbitMQ退出或死机会忘记队列和消息除非你不告诉它。有两件事是必需的,以确保消息不会丢失:我们需要标记队列和消息的持久性。
首先,我们需要确保我们的队列RabbitMQ永远不会失去。为了做到这一要求,我们需要声明它是耐用的:
1 channel.queue_declare(queue='hello', durable=True)
虽然这个命令本身是正确的,但它不会在我们的设置中工作。这是因为我们已经定义了一个队列,称为“你好”,这是不持久的。RabbitMQ不允许你重新定义现有队列用不同的参数,将返回一个错误的任何程序,试图这么做。但有一个快速的解决方法-让我们声明一个名称不同的队列,对exampletask_queue:
1 channel.queue_declare(queue='task_queue', durable=True)
这queue_declare需要改变应用到生产者和消费者都编码。
在这一点上,我们相信,task_queue队列不会即使RabbitMQ重启了。现在我们要纪念我们的消息持久性-通过提供一个delivery_mode财产价值2。
1 channel.basic_publish(exchange='', 2 routing_key="task_queue", 3 body=message, 4 properties=pika.BasicProperties( 5 delivery_mode = 2, # make message persistent 6 ))
消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完, 同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还 没处理完的时候就不要再给我发新消息了。
1 channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码
生产者端

1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 11 message = ' '.join(sys.argv[1:]) or "Hello World!" 12 channel.basic_publish(exchange='', 13 routing_key='task_queue', 14 body=message, 15 properties=pika.BasicProperties( 16 delivery_mode = 2, # make message persistent 17 )) 18 print(" [x] Sent %r" % message) 19 connection.close()
消费者端

1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 print(' [*] Waiting for messages. To exit press CTRL+C') 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 time.sleep(body.count(b'.')) 15 print(" [x] Done") 16 ch.basic_ack(delivery_tag = method.delivery_tag) 17 18 channel.basic_qos(prefetch_count=1) 19 channel.basic_consume(callback, 20 queue='task_queue') 21 22 channel.start_consuming()
publish/Subscribe(消息发布/订阅)
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了
交换是一件很简单的事。在一方,它接收来自生产者的信息,另一方则将它们推到队列。交换必须知道该如何处理它接收到的消息。它应该被添加到一个特定的队列吗?它应该被添加到许多队列吗?或者它应该被丢弃。由交换类型定义的规则
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
消息publisher

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange='logs', 13 routing_key='', 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()
消息subscriber

1 #_*_coding:utf-8_*_ 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 12 queue_name = result.method.queue 13 14 channel.queue_bind(exchange='logs', 15 queue=queue_name) 16 17 print(' [*] Waiting for logs. To exit press CTRL+C') 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) 25 26 channel.start_consuming()
有选择的接受消息(exchange Type=direct)
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
publisher

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='direct_logs', 14 routing_key=severity, 15 body=message) 16 print(" [x] Sent %r:%r" % (severity, message)) 17 connection.close()
subscriber

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 17 sys.exit(1) 18 19 for severity in severities: 20 channel.queue_bind(exchange='direct_logs', 21 queue=queue_name, 22 routing_key=severity) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
更细致的消息过滤
虽然使用直接交换改善了我们的系统,它仍然有局限性-它不能做路由的基础上多个标准。
在我们的日志系统中,我们可能希望订阅不仅基于严重性的日志,也可以基于发出日志的源代码来进行日志记录。你可能从syslog UNIX工具知道这个概念,路由日志基于严重性(信息/警告/暴击…)和设备(认证/克隆/核心…)。
这会给我们很大的灵活性,我们可能要听关键的错误来自“克隆”也从核心所有日志”。
publisher

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='topic_logs', 9 type='topic') 10 11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='topic_logs', 14 routing_key=routing_key, 15 body=message) 16 print(" [x] Sent %r:%r" % (routing_key, message)) 17 connection.close()
subscriber

1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='topic_logs', 9 type='topic') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 binding_keys = sys.argv[1:] 15 if not binding_keys: 16 sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) 17 sys.exit(1) 18 19 for binding_key in binding_keys: 20 channel.queue_bind(exchange='topic_logs', 21 queue=queue_name, 22 routing_key=binding_key) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
Remote procedure call (RPC)
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:
1 fibonacci_rpc = FibonacciRpcClient() 2 result = fibonacci_rpc.call(4) 3 print("fib(4) is %r" % result)
RPC server

1 #_*_coding:utf-8_*_ 2 __author__ = 'Alex Li' 3 import pika 4 import time 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 8 channel = connection.channel() 9 10 channel.queue_declare(queue='rpc_queue') 11 12 def fib(n): 13 if n == 0: 14 return 0 15 elif n == 1: 16 return 1 17 else: 18 return fib(n-1) + fib(n-2) 19 20 def on_request(ch, method, props, body): 21 n = int(body) 22 23 print(" [.] fib(%s)" % n) 24 response = fib(n) 25 26 ch.basic_publish(exchange='', 27 routing_key=props.reply_to, 28 properties=pika.BasicProperties(correlation_id = 29 props.correlation_id), 30 body=str(response)) 31 ch.basic_ack(delivery_tag = method.delivery_tag) 32 33 channel.basic_qos(prefetch_count=1) 34 channel.basic_consume(on_request, queue='rpc_queue') 35 36 print(" [x] Awaiting RPC requests") 37 channel.start_consuming()
RPC client

1 import pika 2 import uuid 3 4 class FibonacciRpcClient(object): 5 def __init__(self): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 9 self.channel = self.connection.channel() 10 11 result = self.channel.queue_declare(exclusive=True) 12 self.callback_queue = result.method.queue 13 14 self.channel.basic_consume(self.on_response, no_ack=True, 15 queue=self.callback_queue) 16 17 def on_response(self, ch, method, props, body): 18 if self.corr_id == props.correlation_id: 19 self.response = body 20 21 def call(self, n): 22 self.response = None 23 self.corr_id = str(uuid.uuid4()) 24 self.channel.basic_publish(exchange='', 25 routing_key='rpc_queue', 26 properties=pika.BasicProperties( 27 reply_to = self.callback_queue, 28 correlation_id = self.corr_id, 29 ), 30 body=str(n)) 31 while self.response is None: 32 self.connection.process_data_events() 33 return int(self.response) 34 35 fibonacci_rpc = FibonacciRpcClient() 36 37 print(" [x] Requesting fib(30)") 38 response = fibonacci_rpc.call(30) 39 print(" [.] Got %r" % response)
二.redis
redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都 支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排 序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文 件,并且在此基础上实现了master-slave(主从)同步。
一、Redis安装和基本使用
1 wget http://download.redis.io/releases/redis-3.0.6.tar.gz 2 tar xzf redis-3.0.6.tar.gz 3 cd redis-3.0.6 4 make
启动服务端
1 src/redis-server
启动客户端
1 src/redis-cli 2 redis> set foo bar 3 OK 4 redis> get foo 5 "bar"
二、Python操作Redis
1 sudo pip install redis 2 or 3 sudo easy_install redis 4 or 5 源码安装 6 7 详见:https://github.com/WoLpH/redis-py
API使用
redis-py 的API的使用可以分类为:
- 连接方式
- 连接池
- 操作
- String 操作
- Hash 操作
- List 操作
- Set 操作
- Sort Set 操作
- 管道
- 发布订阅
1、操作模式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis 5 6 r = redis.Redis(host='10.211.55.4', port=6379) 7 r.set('foo', 'Bar') 8 print r.get('foo')
2、连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数 Redis,这样就可以实现多个Redis实例共享一个连接池。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis 5 6 pool = redis.ConnectionPool(host='10.211.55.4', port=6379) 7 8 r = redis.Redis(connection_pool=pool) 9 r.set('foo', 'Bar') 10 print r.get('foo')
3、操作
String操作,redis中的String在在内存中按照一个name对应一个value来存储。如图
set(name, value, ex=None, px=None, nx=False, xx=False)
1 在Redis中设置值,默认,不存在则创建,存在则修改 2 参数: 3 ex,过期时间(秒) 4 px,过期时间(毫秒) 5 nx,如果设置为True,则只有name不存在时,当前set操作才执行 6 xx,如果设置为True,则只有name存在时,岗前set操作才执行
setnx(name, value)
1 设置值,只有name不存在时,执行设置操作(添加)
setex(name, value, time)
1 # 设置值 2 # 参数: 3 # time,过期时间(数字秒 或 timedelta对象)
psetex(name, time_ms, value)
1 # 设置值 2 # 参数: 3 # time_ms,过期时间(数字毫秒 或 timedelta对象)
mset(*args, **kwargs)
批量设置值 如: mset(k1='v1', k2='v2') 或 mget({'k1': 'v1', 'k2': 'v2'})
get(name)
1 获取值
mget(keys, *args)
1 批量获取 2 如: 3 mget('ylr', 'wupeiqi') 4 或 5 r.mget(['ylr', 'wupeiqi'])
getset(name, value)
1 设置新值并获取原来的值
getrange(key, start, end)
1 # 获取子序列(根据字节获取,非字符) 2 # 参数: 3 # name,Redis 的 name 4 # start,起始位置(字节) 5 # end,结束位置(字节) 6 # 如: "陈学涛" ,0-3表示 "陈"
setrange(name, offset, value)
1 # 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) 2 # 参数: 3 # offset,字符串的索引,字节(一个汉字三个字节) 4 # value,要设置的值
setbit(name, offset, value)
1 # 对name对应值的二进制表示的位进行操作 2 3 # 参数: 4 # name,redis的name 5 # offset,位的索引(将值变换成二进制后再进行索引) 6 # value,值只能是 1 或 0 7 8 # 注:如果在Redis中有一个对应: n1 = "foo", 9 那么字符串foo的二进制表示为:01100110 01101111 01101111 10 所以,如果执行 setbit('n1', 7, 1),则就会将第7位设置为1, 11 那么最终二进制则变成 01100111 01101111 01101111,即:"goo" 12 13 # 扩展,转换二进制表示: 14 15 # source = "陈学涛" 16 source = "foo" 17 18 for i in source: 19 num = ord(i) 20 print bin(num).replace('b','') 21 22 特别的,如果source是汉字 "陈学涛"怎么办? 23 答:对于utf-8,每一个汉字占 3 个字节,那么 "陈学涛" 则有 9个字节 24 对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制 25 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
getbit(name, offset)
1 # 获取name对应的值的二进制表示中的某位的值 (0或1)
bitcount(key, start=None, end=None)
1 # 获取name对应的值的二进制表示中 1 的个数 2 # 参数: 3 # key,Redis的name 4 # start,位起始位置 5 # end,位结束位置
bitop(operation, dest, *keys)
1 # 获取多个值,并将值做位运算,将最后的结果保存至新的name对应的值 2 3 # 参数: 4 # operation,AND(并) 、 OR(或) 、 NOT(非) 、 XOR(异或) 5 # dest, 新的Redis的name 6 # *keys,要查找的Redis的name 7 8 # 如: 9 bitop("AND", 'new_name', 'n1', 'n2', 'n3') 10 # 获取Redis中n1,n2,n3对应的值,然后讲所有的值做位运算(求并集),然后将结果保存 new_name 对应的值中
strlen(name)
1 # 返回name对应值的字节长度(一个汉字3个字节)
incr(self, name, amount=1)
1 # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 2 3 # 参数: 4 # name,Redis的name 5 # amount,自增数(必须是整数) 6 7 # 注:同incrby
incrbyfloat(self, name, amount=1.0)
1 # 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 2 3 # 参数: 4 # name,Redis的name 5 # amount,自增数(浮点型)
decr(self, name, amount=1)
# 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。 # 参数: # name,Redis的name # amount,自减数(整数)
append(key, value)
1 # 在redis name对应的值后面追加内容 2 3 # 参数: 4 key, redis的name 5 value, 要追加的字符串
Hash操作,redis中Hash在内存中的存储格式如下图:
hset(name, key, value)
1 # name对应的hash中设置一个键值对(不存在,则创建;否则,修改) 2 3 # 参数: 4 # name,redis的name 5 # key,name对应的hash中的key 6 # value,name对应的hash中的value 7 8 # 注: 9 # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)
hmset(name, mapping)
# 在name对应的hash中批量设置键值对 # 参数: # name,redis的name # mapping,字典,如:{'k1':'v1', 'k2': 'v2'} # 如: # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
hget(name,key)
1 # 在name对应的hash中获取根据key获取value
hmget(name, keys, *args)
1 # 在name对应的hash中获取多个key的值 2 3 # 参数: 4 # name,reids对应的name 5 # keys,要获取key集合,如:['k1', 'k2', 'k3'] 6 # *args,要获取的key,如:k1,k2,k3 7 8 # 如: 9 # r.mget('xx', ['k1', 'k2']) 10 # 或 11 # print r.hmget('xx', 'k1', 'k2')
hgetall(name)
1 获取name对应hash的所有键值
hlen(name)
1 # 获取name对应的hash中键值对的个数
hkeys(name)
1 # 获取name对应的hash中所有的key的值
hvals(name)
1 # 获取name对应的hash中所有的value的值
hexists(name, key)
1 # 检查name对应的hash是否存在当前传入的key
hdel(name,*keys)
1 # 将name对应的hash中指定key的键值对删除
hincrby(name, key, amount=1)
1 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 2 # 参数: 3 # name,redis中的name 4 # key, hash对应的key 5 # amount,自增数(整数)
hincrbyfloat(name, key, amount=1.0)
1 # 自增name对应的hash中的指定key的值,不存在则创建key=amount 2 3 # 参数: 4 # name,redis中的name 5 # key, hash对应的key 6 # amount,自增数(浮点数) 7 8 # 自增name对应的hash中的指定key的值,不存在则创建key=amount
hscan(name, cursor=0, match=None, count=None)
1 # 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆 2 3 # 参数: 4 # name,redis的name 5 # cursor,游标(基于游标分批取获取数据) 6 # match,匹配指定key,默认None 表示所有的key 7 # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 8 9 # 如: 10 # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None) 11 # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None) 12 # ... 13 # 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕
hscan_iter(name, match=None, count=None)
1 # 利用yield封装hscan创建生成器,实现分批去redis中获取数据 2 3 # 参数: 4 # match,匹配指定key,默认None 表示所有的key 5 # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数 6 7 # 如: 8 # for item in r.hscan_iter('xx'): 9 # print item
List操作,redis中的List在在内存中按照一个name对应一个List来存储。如图:
lpush(name,values)
1 # 在name对应的list中添加元素,每个新的元素都添加到列表的最左边 2 3 # 如: 4 # r.lpush('oo', 11,22,33) 5 # 保存顺序为: 33,22,11 6 7 # 扩展: 8 # rpush(name, values) 表示从右向左操作
lpushx(name,value)
1 # 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边 2 3 # 更多: 4 # rpushx(name, value) 表示从右向左操作
llen(name)
1 # name对应的list元素的个数
linsert(name, where, refvalue, value))
1 # 在name对应的列表的某一个值前或后插入一个新值 2 3 # 参数: 4 # name,redis的name 5 # where,BEFORE或AFTER 6 # refvalue,标杆值,即:在它前后插入数据 7 # value,要插入的数据
r.lset(name, index, value)
1 # 对name对应的list中的某一个索引位置重新赋值 2 3 # 参数: 4 # name,redis的name 5 # index,list的索引位置 6 # value,要设置的值
r.lrem(name, value, num)
1 # 在name对应的list中删除指定的值 2 3 # 参数: 4 # name,redis的name 5 # value,要删除的值 6 # num, num=0,删除列表中所有的指定值; 7 # num=2,从前到后,删除2个; 8 # num=-2,从后向前,删除2个
lpop(name)
1 # 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素 2 3 # 更多: 4 # rpop(name) 表示从右向左操作
lindex(name, index)
1 在name对应的列表中根据索引获取列表元素
lrange(name, start, end)
1 # 在name对应的列表分片获取数据 2 # 参数: 3 # name,redis的name 4 # start,索引的起始位置 5 # end,索引结束位置
ltrim(name, start, end)
1 # 在name对应的列表中移除没有在start-end索引之间的值 2 # 参数: 3 # name,redis的name 4 # start,索引的起始位置 5 # end,索引结束位置
rpoplpush(src, dst)
1 # 从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边 2 # 参数: 3 # src,要取数据的列表的name 4 # dst,要添加数据的列表的name
blpop(keys, timeout)
1 # 将多个列表排列,按照从左到右去pop对应列表的元素 2 3 # 参数: 4 # keys,redis的name的集合 5 # timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞 6 7 # 更多: 8 # r.brpop(keys, timeout),从右向左获取数据
brpoplpush(src, dst, timeout=0)
1 # 从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧 2 3 # 参数: 4 # src,取出并要移除元素的列表对应的name 5 # dst,要插入元素的列表对应的name 6 # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞
自定义增量迭代
1 # 由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要: 2 # 1、获取name对应的所有列表 3 # 2、循环列表 4 # 但是,如果列表非常大,那么就有可能在第一步时就将程序的内容撑爆,所有有必要自定义一个增量迭代的功能: 5 6 def list_iter(name): 7 """ 8 自定义redis列表增量迭代 9 :param name: redis中的name,即:迭代name对应的列表 10 :return: yield 返回 列表元素 11 """ 12 list_count = r.llen(name) 13 for index in xrange(list_count): 14 yield r.lindex(name, index) 15 16 # 使用 17 for item in list_iter('pp'): 18 print item
Set操作,Set集合就是不允许重复的列表
sadd(name,values)
1 # name对应的集合中添加元素
scard(name)
1 获取name对应的集合中元素个数
sdiff(keys, *args)
1 在第一个name对应的集合中且不在其他name对应的集合的元素集合
sdiffstore(dest, keys, *args)
1 # 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
sinter(keys, *args)
# 获取多一个name对应集合的并集
sinterstore(dest, keys, *args)
# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中
sismember(name, value)
1 # 检查value是否是name对应的集合的成员
smembers(name)
1 # 获取name对应的集合的所有成员
smove(src, dst, value)
1 # 将某个成员从一个集合中移动到另外一个集合
spop(name)
1 # 从集合的右侧(尾部)移除一个成员,并将其返回
srandmember(name, numbers)
1 # 从name对应的集合中随机获取 numbers 个元素
srem(name, values)
1 # 在name对应的集合中删除某些值
sunion(keys, *args)
1 # 获取多一个name对应的集合的并集
sunionstore(dest,keys, *args)
1 # 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
1 # 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
zadd(name, *args, **kwargs)
1 # 在name对应的有序集合中添加元素 2 # 如: 3 # zadd('zz', 'n1', 1, 'n2', 2) 4 # 或 5 # zadd('zz', n1=11, n2=22)
zcard(name)
1 # 获取name对应的有序集合元素的数量
zcount(name, min, max)
1 # 获取name对应的有序集合中分数 在 [min,max] 之间的个数
zincrby(name, value, amount)
1 # 自增name对应的有序集合的 name 对应的分数
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
1 # 按照索引范围获取name对应的有序集合的元素 2 3 # 参数: 4 # name,redis的name 5 # start,有序集合索引起始位置(非分数) 6 # end,有序集合索引结束位置(非分数) 7 # desc,排序规则,默认按照分数从小到大排序 8 # withscores,是否获取元素的分数,默认只获取元素的值 9 # score_cast_func,对分数进行数据转换的函数 10 11 # 更多: 12 # 从大到小排序 13 # zrevrange(name, start, end, withscores=False, score_cast_func=float) 14 15 # 按照分数范围获取name对应的有序集合的元素 16 # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float) 17 # 从大到小排序 18 # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value)
1 # 获取某个值在 name对应的有序集合中的排行(从 0 开始) 2 3 # 更多: 4 # zrevrank(name, value),从大到小排序
zrangebylex(name, min, max, start=None, num=None)
1 # 当有序集合的所有成员都具有相同的分值时,有序集合的元素会根据成员的 值 (lexicographical ordering)来进行排序,而这个命令则可以返回给定的有序集合键 key 中, 元素的值介于 min 和 max 之间的成员 2 # 对集合中的每个成员进行逐个字节的对比(byte-by-byte compare), 并按照从低到高的顺序, 返回排序后的集合成员。 如果两个字符串有一部分内容是相同的话, 那么命令会认为较长的字符串比较短的字符串要大 3 4 # 参数: 5 # name,redis的name 6 # min,左区间(值)。 + 表示正无限; - 表示负无限; ( 表示开区间; [ 则表示闭区间 7 # min,右区间(值) 8 # start,对结果进行分片处理,索引位置 9 # num,对结果进行分片处理,索引后面的num个元素 10 11 # 如: 12 # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga 13 # r.zrangebylex('myzset', "-", "[ca") 结果为:['aa', 'ba', 'ca'] 14 15 # 更多: 16 # 从大到小排序 17 # zrevrangebylex(name, max, min, start=None, num=None)
zrem(name, values)
1 # 删除name对应的有序集合中值是values的成员 2 3 # 如:zrem('zz', ['s1', 's2'])
zremrangebyrank(name, min, max)
1 # 根据排行范围删除
zremrangebyscore(name, min, max)
1 # 根据分数范围删除
zremrangebylex(name, min, max)
1 # 根据值返回删除
zscore(name, value)
1 # 获取name对应有序集合中 value 对应的分数
zinterstore(dest, keys, aggregate=None)
1 # 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作 2 # aggregate的值为: SUM MIN MAX
zunionstore(dest, keys, aggregate=None)
1 # 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作 2 # aggregate的值为: SUM MIN MAX
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
1 # 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作
其他常用操作
delete(*names)
1 # 根据删除redis中的任意数据类型
exists(name)
1 # 检测redis的name是否存在
keys(pattern='*')
1 # 根据模型获取redis的name 2 3 # 更多: 4 # KEYS * 匹配数据库中所有 key 。 5 # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 6 # KEYS h*llo 匹配 hllo 和 heeeeello 等。 7 # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name ,time)
1 # 为某个redis的某个name设置超时时间
rename(src, dst)
1 2 # 对redis的name重命名为
move(name, db))
1 # 将redis的某个值移动到指定的db下
randomkey()
1 # 随机获取一个redis的name(不删除)
type(name)
1 # 获取name对应值的类型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
1 # 同字符串操作,用于增量迭代获取key
4、管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis 5 6 pool = redis.ConnectionPool(host='10.211.55.4', port=6379) 7 8 r = redis.Redis(connection_pool=pool) 9 10 # pipe = r.pipeline(transaction=False) 11 pipe = r.pipeline(transaction=True) 12 13 r.set('name', 'alex') 14 r.set('role', 'sb') 15 16 pipe.execute()
5、发布订阅
发布者:服务器
订阅者:Dashboad和数据处理
Demo如下:

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis 5 6 7 class RedisHelper: 8 9 def __init__(self): 10 self.__conn = redis.Redis(host='10.211.55.4') 11 self.chan_sub = 'fm104.5' 12 self.chan_pub = 'fm104.5' 13 14 def public(self, msg): 15 self.__conn.publish(self.chan_pub, msg) 16 return True 17 18 def subscribe(self): 19 pub = self.__conn.pubsub() 20 pub.subscribe(self.chan_sub) 21 pub.parse_response() 22 return pub
订阅者:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 from monitor.RedisHelper import RedisHelper 5 6 obj = RedisHelper() 7 redis_sub = obj.subscribe() 8 9 while True: 10 msg= redis_sub.parse_response() 11 print msg
发布者:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 from monitor.RedisHelper import RedisHelper 5 6 obj = RedisHelper() 7 obj.public('hello')
更多参见:https://github.com/andymccurdy/redis-py/
http://doc.redisfans.com/