RabbitMQ
好处: 解耦,异步,流量削峰
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
简单命令
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl list_queues
默认端口号 15672
模式
简单模式(最广泛)
参数
交换机模式
-- 发布订阅
-- 关键字模式
-- 模糊匹配模式
简单模式
- 连接rabbitmq
- 创建队列
- 向指定的队列插入数据
**生产者 **basic_publish
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', # 简单模式 交换机为空
routing_key='hello', # 指定队列
body='Hello World!') # 向指定队列插入内容
print(" [x] Sent 'Hello World!'")
**消费者 **basic_consume
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello',
auto_ack=True , #自动应答
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
参数使用
应答参数
模拟消费者出问题
生产者生产数据给消费者,消费者取到数据则队列无数据,若消费者出bug ,再次启动,则取不到数据了
auto_ack=False #改默认应答为手动应答
ch.basic_ack(delivery_tag=method.delivery_tag) # 给信号
原理
生产者生产数据放入队列,消费者改手动应答,消费者取数据。队列还会保留一份数据,当消费者发出信号后ch.basic_ack(delivery_tag=method.delivery_tag) ,队列则删除数据
手动应答牺牲效率
注重效率则默认应答
持久化参数
模拟队列(rabbitmq)出问题
#声明queue
channel.queue_declare(queue='hello2', durable=True) # durable=True 声明可持久化的队列
# 但是放数据的时候还得指定是否持久化
properties=pika.BasicProperties(
delivery_mode=2, #持久化参数2
)
channel.basic_publish(exchange='',
routing_key='hSSello2',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
) # 但是放数据的时候还得指定是否持久化
#properties=pika.BasicProperties(
# delivery_mode=2, # make # message persistent
# )
分发参数
改为手动应应答并且加上 channel.basic_qos(prefetch_count=1) # 消费者 加上这一句话
默认轮询分发,一人一个
轮询分发(round-robin)不管谁忙,都不会多给消息,总是你一个我一个。想要做到公平分发(fair dispatch),必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。
channel.basic_qos(prefetch_count=1) # 消费者 加上这一句话
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
import time
time.sleep(4)
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) ########
channel.basic_consume(queue='hello',
auto_ack=False,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
while True:
channel.start_consuming()
交换机模式(routingkey和exchange同时满足)
基于交换机通信(容器) 由生产者创建,向交换机插入数据
消费者创建队列 队列绑定交换机
发布订阅
# 生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', ## 交换机模式 名字任意
exchange_type='fanout') ## fanout 发布订阅模式
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
#####################################################################
# 消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', ## 交换机模式 名字任意
exchange_type='fanout') ## fanout 发布订阅模式
# 消费者创建队列
result = channel.queue_declare("",exclusive=True) # 随机名字 "" , exclusive=True
queue_name = result.method.queue # 拿到随机名字
# 绑定到交换机上
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
关键字
生产者放一个关键字 消费者放一个关键字
匹配成功后 则给消费者
# 生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', ## 交换机模式 名字任意
exchange_type='direct') ## direct 关键字模式
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='xxx', # 绑定关键字
body=message)
print(" [x] Sent %r" % message)
connection.close()
#####################################################################
# 消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', ## 交换机模式 名字任意
exchange_type='direct') ## direct 关键字模式
# 消费者创建队列
result = channel.queue_declare("",exclusive=True) # 随机名字 "" , exclusive=True
queue_name = result.method.queue # 拿到随机名字
# 绑定到交换机上
channel.queue_bind(exchange='logs',
queue=queue_name,routing_key='xxx') # routing_key 绑定关键字
[ 可以绑定多个关键字
channel.queue_bind(exchange='logs',
queue=queue_name,routing_key='xxx') # channel.queue_bind(exchange='logs',
queue=queue_name,routing_key='xxx') #
]
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
通配符
同关键字相比,模糊匹配
# 一个多个单词
* 一个单词
uss.#
#.news
#.weather
# 生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs3',
exchange_type='topic')
message = "info: Hello ERU!"
channel.basic_publish(exchange='logs3',
routing_key='europe.weather',
body=message)
print(" [x] Sent %r" % message)
connection.close()
# 消费者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs3',
exchange_type='topic')
result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs3',
queue=queue_name,
routing_key="#.news")
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()