1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 """ 4 @author: zengchunyun 5 """ 6 import pika 7 8 9 class MQServer(object): 10 def __init__(self, host, port=5672, exchange=None, exchange_type="topic"): 11 """ 12 初始化MQ设置 13 :param host: MQ服务器地址 14 :param port: MQ端口 15 :param exchange: 交换器名称 16 :param exchange_type: 交换器类型,默认关键字类型 17 :return: 18 """ 19 self.host = host 20 self.port = port 21 self.exchange = exchange 22 self.exchange_type = exchange_type 23 self.queue = None 24 self.connection = self.connect() 25 self.channel = self.connect_channel() 26 self.create_exchange() 27 28 def connect(self): 29 """ 30 连接MQ服务器 31 :return: 32 """ 33 return pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port)) 34 35 def connect_channel(self): 36 """ 37 创建频道 38 :return: 39 """ 40 return self.connection.channel() 41 42 def create_exchange(self): 43 """ 44 定义交换器名称,防止发布时,如果交换器不存在,异常 45 :return: 46 """ 47 self.channel.exchange_declare(exchange=self.exchange, type=self.exchange_type) 48 49 def publish(self, exchange=None, routing_key=None, body=None): 50 """ 51 创建发布者 52 :param exchange: 交换器名称 53 :param routing_key: 路由KEY 54 :param body:消息主体 55 :return: 56 """ 57 if exchange: 58 self.exchange = exchange 59 self.channel.basic_publish(exchange=self.exchange, routing_key=routing_key, body=body) 60 self.close() 61 62 def consumer(self, exchange=None, routing_key=None, callback=None): 63 """ 64 创建消费者 65 :param exchange: 66 :param routing_key: 67 :param callback: 68 :return: 69 """ 70 if exchange: 71 self.exchange = exchange 72 self.create_queue() 73 self.channel.queue_bind(queue=self.queue, exchange=self.exchange, routing_key=routing_key) 74 self.channel.basic_consume(consumer_callback=callback, queue=self.queue, no_ack=True) 75 self.start() 76 77 def create_queue(self): 78 """ 79 生成队列,当关闭consumer时,加上exclusive=True,queue也会被删除 80 :return: 81 """ 82 self.queue = self.channel.queue_declare(exclusive=True).method.queue # 为每个消费者生成不同的队列 83 84 def close(self): 85 """ 86 关闭消息连接 87 :return: 88 """ 89 self.connection.close() 90 91 def start(self): 92 self.channel.start_consuming()
1.消息持久化存储
虽然有了消息反馈机制,但如果rabbitmq自身挂掉的话,那么任务还是会丢失,所以需要将任务持久化存储起来,
durable=True # 开启持久化设置,rabbitmq不允许使用不同的参数来重新定义存在的队列
self.queue = self.channel.queue_declare(exclusive=True,durable=True)
self.channel.exchange_declare(exchange=self.exchange, type=self.exchange_type, durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储
1 self.channel.basic_publish(exchange='', 2 routing_key=routing_key, 3 body=message, 4 properties=pika.BasicProperties( 5 delivery_mode = 2, # make message persistent 6 ))
2.公平调度(fair dispatch)
虽然每个工作者是依次分配到任务,但是每个任务不一定一样,可能有到任务比较重,执行时间长,有的任务比较轻,执行时间短,如果能公平调度最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务
1 channel.basic_qos(prefetch_count=1)
完整示例代码
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 11 message = ' '.join(sys.argv[1:]) or "Hello World!" 12 channel.basic_publish(exchange='', 13 routing_key='task_queue', 14 body=message, 15 properties=pika.BasicProperties( 16 delivery_mode = 2, # make message persistent 17 )) 18 print (" [x] Sent %r" % (message,)) 19 connection.close()
消费者代码
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='task_queue', durable=True) 10 print( ' [*] Waiting for messages. To exit press CTRL+C') 11 12 def callback(ch, method, properties, body): 13 print (" [x] Received %r" % (body,)) 14 time.sleep( body.count('.') ) 15 print (" [x] Done") 16 ch.basic_ack(delivery_tag = method.delivery_tag) 17 18 channel.basic_qos(prefetch_count=1) 19 channel.basic_consume(callback, 20 queue='task_queue') 21 22 channel.start_consuming()