- 介绍
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。
应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
- 特点
MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。
- 使用场景
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
- 通过RabbitMQ实现简单的队列通信(默认端口5672)
send端(publish)

1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters('localhost') 5 )#建立socket 6 7 channel = connection.channel() #声明一个管道 8 9 # 声明queue 10 channel.queue_declare(queue='hello',durable=True) #queue=队列名字,durable=True队列持久化 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 channel.basic_publish(exchange='', 14 routing_key='hello', 15 body='Hello World!', 16 properties=pika.BasicProperties( 17 delivery_mode=2,#使消息持久化 18 19 ) 20 )#routing_key=队列名,body=发送的消息 21 print(" [x] Sent 'Hello World!'") 22 connection.close()
receive端(consumer)

1 import pika 2 import time 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters('localhost') 6 ) 7 8 channel = connection.channel() 9 10 # You may ask why we declare the queue again ‒ we have already declared it in our previous code. 11 # We could avoid that if we were sure that the queue already exists. For example if send.py program 12 # was run before. But we're not yet sure which program to run first. In such cases it's a good 13 # practice to repeat declaring the queue in both programs. 14 channel.queue_declare(queue='hello',durable=True) 15 16 17 def callback(ch, method, properties, body):#ch管道的内存地址,method消息的信息 18 print('--?',ch,method,properties,body) 19 time.sleep(30) 20 print(" [x] Received %r" % body) 21 22 ch.basic_ack(delivery_tag=method.delivery_tag) 23 24 channel.basic_qos(prefetch_count=1)#消息公平分发(告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了) 25 #消费消息 26 channel.basic_consume(callback,#如果收到消息就调用callback处理消息 27 queue='hello', 28 # no_ack=True#no ackownledgement 29 ) 30 31 print(' [*] Waiting for messages. To exit press CTRL+C') 32 channel.start_consuming()
no_ack=True#当接收端在未完成接收工作时断开,不会给publish发送确认消息
- 几种工作模式
a.Work Queues(消息分发轮询)
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c)

import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 声明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()

import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
b.消息持久化
1 channel.queue_declare(queue='hello', durable=True)
durable=True
c.消息公平分发
1 channel.basic_qos(prefetch_count=1) (消费者端)
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
d.PublishSubscribe(消息发布订阅)
让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
fanout广播模式:

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()

import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
direct有选择的接收消息:
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
topic更细致的消息过滤:
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()