RabbitMQ是一个消息代理,它接受和转发消息,是一个由 Erlang 语言开发的遵循AMQP协议的开源实现。在RabbitMQ中生产者不会将消息直接发送到队列当中,而是将消息直接发送到交换机(exchange),交换机用来接受生产者发送的消息并将这些消息发送给绑定的队列,即:生产者-->交换机-->队列。
在RabbitMQ中最主要的三种交换机:1. fanout(广播交换机) 2. direct(直连交换机) 3. topic(话题交换机)
1. fanout(广播交换机)
fanout会将接受到的所有消息广播到它所绑定的所有队列当中(每个消费者都会收到所有的消息),对于广播交换机,消息路由键routing_key和队列绑定键routing_key的作用都会被忽略。
fanout生产者:
1 import pika 2 3 4 class RabbitProducer(object): 5 """ 6 与RabbitMq服务器建立连接 7 """ 8 9 def __init__(self): 10 self.conn = pika.BlockingConnection( 11 pika.ConnectionParameters(host='localhost', port=5672) 12 ) 13 self.channel = self.conn.channel() 14 15 # 声明一个exchange交换机,交换机的类型为fanout广播. 16 self.channel.exchange_declare( 17 exchange='fanout_exchange', exchange_type='fanout', durable=True 18 ) 19 20 def send_msg(self, message): 21 """ 22 routing_key:绑定的key 23 :param message: 24 :return: 25 """ 26 self.channel.basic_publish( 27 exchange='fanout_exchange', 28 routing_key='', # 因为exchange的类型为fanout,所以routing_key的数值在这里将被忽略 29 body=message, 30 properties=pika.BasicProperties( 31 delivery_mode=2, 32 # 消息进行持久化(防止服务器挂掉.)===> 如果没有queue绑定到这个exchange交换机,这个参数是没有的. 33 )) 34 35 def close(self): 36 self.conn.close() 37 38 39 if __name__ == "__main__": 40 rabbit_producer = RabbitProducer() 41 for i in range(10): 42 message = 'hello world {}!'.format(i) 43 rabbit_producer.send_msg(message)
消费者consumer1:
1 import pika 2 import uuid 3 4 5 class RabbitConsumer(object): 6 """ 7 fanout 消费者1 8 """ 9 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 声明一个队列queue_consumer1,并进行持久化(防止服务器挂掉),exclusive设置为false 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='queue_consumer1' 19 ) 20 21 # 声明一个exhange交换机,其类型为fanout广播类型 与生产者的交换机一致 22 self.channel.exchange_declare( 23 exchange='fanout_exchange', exchange_type='fanout', durable=True 24 ) 25 26 # 将队列queue_consumer1与该exchange交换机进行绑定 27 self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer1') 28 29 def call_back(self, method, body): 30 """ 31 消费者对消息进行确认,防止消费者挂掉. 32 :param method: 33 :param body: 34 :return: 35 """ 36 self.channel.basic_ack(delivery_tag=method.delivery_tag) 37 print('接收到的消息为:{}'.format(str(body))) 38 39 def receive_msg(self): 40 print('consumer1开始接受消息...') 41 # 当上一条消息未确认时,会告知RabbitMQ不要再发送消息给这个消费者了 可以控制流量 42 self.channel.basic_qos(prefetch_count=1) 43 self.channel.basic_consume( 44 consumer_callback=self.call_back, 45 queue='queue_consumer1', 46 no_ack=False, # 消费者对消息进行确认,防止消费者挂掉 47 consumer_tag=str(uuid.uuid4()) 48 ) 49 50 def consume(self): 51 self.receive_msg() 52 self.channel.start_consuming() 53 54 55 if __name__ == '__main__': 56 rabbit_consumer = RabbitConsumer() 57 rabbit_consumer.consume()
消费者consumer2:
1 import pika 2 import uuid 3 4 5 class RabbitConsumer(object): 6 def __init__(self): 7 self.conn = pika.BlockingConnection( 8 pika.ConnectionParameters(host='localhost', port=5672) 9 ) 10 self.channel = self.conn.channel() 11 12 # 声明一个队列queue_consumer2,并进行持久化(防止服务器挂掉),exclusive设置为false 13 self.channel.queue_declare( 14 exclusive=False, durable=True, queue='queue_consumer2' 15 ) 16 17 # T声明一个exhange交换机,其类型为fanout广播类型 18 self.channel.exchange_declare( 19 exchange='fanout_exchange', exchange_type='fanout', durable=True 20 ) 21 22 # 将队列queue_consumer2与该exchange交换机进行绑定 23 self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer2') 24 25 def call_back(self, method, body): 26 """ 27 消费者对消息进行确认,防止消费者挂掉. 28 :param method: 29 :param body: 30 :return: 31 """ 32 self.channel.basic_ack(delivery_tag=method.delivery_tag) 33 print('接收到的消息为:{}'.format(str(body))) 34 35 def receive_msg(self): 36 print('consumer2开始接受消息...') 37 self.channel.basic_consume( 38 consumer_callback=self.call_back, 39 queue='queue_consumer2', 40 no_ack=False, 41 consumer_tag=str(uuid.uuid4()) 42 ) 43 44 def consume(self): 45 self.receive_msg() 46 self.channel.start_consuming() 47 48 49 if __name__ == '__main__': 50 rabbit_consumer = RabbitConsumer() 51 rabbit_consumer.consume()
fanout会将接受到的所有消息广播到消费者consumer1和消费者consumer2,交换机的缺陷:它只能无意识的播放,不够灵活地控制消息广播给指定的消费者
2. direct(直连交换机)
对于direct,根据绑定键判定应该将数据发送至哪个队列,消息进入队列,其绑定秘钥(routing_key)与消息的路由秘钥要完全匹配,当exchange使用相同的绑定秘钥(routing_key)去绑定多个队列也是合法的,在这种情况下direct exchange的效果等同于fanout exchange,交换机会将消息广播到所有匹配的队列当中。
direct生产者:
1 import pika 2 3 4 class RabbitProducer(object): 5 """ 6 与RabbitMq服务器建立连接 7 """ 8 9 def __init__(self): 10 self.conn = pika.BlockingConnection( 11 pika.ConnectionParameters(host='localhost', port=5672) 12 ) 13 self.channel = self.conn.channel() 14 15 # 声明一个exchange交换机,交换机的类型为direct 16 self.channel.exchange_declare( 17 exchange='direct_exchange', exchange_type='direct', durable=True 18 ) 19 20 def send_msg(self, routing_key, message): 21 """ 22 :param routing_key: 消息的路由键 本例中为routing_info 23 :param message: 生成者发送的消息 24 :return: 25 """ 26 self.channel.basic_publish( 27 exchange='direct_exchange', 28 routing_key=routing_key, 29 body=message, 30 properties=pika.BasicProperties( 31 delivery_mode=2, 32 # 消息进行持久化(防止服务器挂掉.)===> 如果没有queue绑定到这个exchange交换机,这个参数是没有的. 33 )) 34 35 def close(self): 36 self.conn.close() 37 38 39 if __name__ == "__main__": 40 rabbit_producer = RabbitProducer() 41 routing_key = 'routing_info' 42 for i in range(10): 43 message = 'hello world {}!'.format(i) 44 print('生产者发送的消息为:{}'.format(message)) 45 rabbit_producer.send_msg(routing_key, message)
direct消费者:
1 import pika 2 import uuid 3 4 5 class RabbitConsumer(object): 6 """ 7 消费者(订阅者) 8 """ 9 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='task_queue' 19 ) 20 21 # 交换机类型为direct. 22 self.channel.exchange_declare( 23 exchange='direct_exchange', exchange_type='direct', durable=True 24 ) 25 26 # 将队列与该exchange交换机进行绑定 27 routing_keys = ['routing_info', 'aaa'] 28 for routing_key in routing_keys: 29 self.channel.queue_bind( 30 exchange='direct_exchange', queue='task_queue', routing_key=routing_key 31 ) # 如果生产者发生消息的routing_key与消费者绑定队列的routing_key相同则成功发送 32 33 def call_back(self, channel, method, properties, body): 34 """ 35 消费者对消息进行确认,防止消费者挂掉 36 :param channel: 37 :param method: 38 :param properties: 39 :param body: 40 :return: 41 """ 42 self.channel.basic_ack(delivery_tag=method.delivery_tag) 43 print('接收到的消息为:{}'.format(str(body))) 44 45 def receive_msg(self): 46 print('开始接受消息...') 47 self.channel.basic_qos(prefetch_count=1) # TODO 告诉RabbitMQ,不要向我发送新的消息. 48 self.channel.basic_consume( 49 consumer_callback=self.call_back, 50 queue='task_queue', 51 no_ack=False, 52 consumer_tag=str(uuid.uuid4()) 53 ) 54 55 def consume(self): 56 self.receive_msg() 57 self.channel.start_consuming() 58 59 60 if __name__ == '__main__': 61 rabbit_consumer = RabbitConsumer() 62 rabbit_consumer.consume()
direct直连交换机相当于是fanout的升级版,当消费者的队列绑定的秘钥routing_key与生产者的routing_key相同时,消费者就会收到消息;当所有消费者的队列所绑定的routing_key都一样且与生产者相同时,就相当于fanout交换机
3. topic(话题交换机)
direct(直连交换机)虽然相当于fanout的升级版,但它仍然有局限性,它不能根据多个标准进行路由;topic(话题交换机)可以很好地解决这一问题:
(1) 如果消息的路由秘钥与队列的绑定秘钥符合匹配规则,topic就会将消息发送到相应的队列当中
(2) 对于绑定键(routing_key)有两个特殊的情况: * (星号)可以代替一个单词,#(散列)可以替代零个或多个单词
(3) 对于发送到topic交换机消息的routing_key如果包含特殊字符,只能是由"."分割的单词表,如("zhangsan.lisi")
topic 生产者:
1 import pika 2 3 4 class RabbitProducer(object): 5 def __init__(self): 6 self.conn = pika.BlockingConnection( 7 pika.ConnectionParameters(host='localhost', port=5672) 8 ) 9 self.channel = self.conn.channel() 10 11 # 声明交换机,交换机的类型为topic 12 self.channel.exchange_declare( 13 exchange='logs_topic', exchange_type='topic', durable=True 14 ) 15 16 def send_msg(self, routing_key, message): 17 """ 18 :param routing_key: 消息的路由键 19 :param message: 生成者发送的消息 20 :return: 21 """ 22 self.channel.basic_publish( 23 exchange='logs_topic', 24 routing_key=routing_key, 25 body=message, 26 properties=pika.BasicProperties( 27 delivery_mode=2, 28 # 消息进行持久化 29 )) 30 31 def close(self): 32 self.conn.close() 33 34 35 if __name__ == "__main__": 36 rabbit_producer = RabbitProducer() 37 routing_keys = ['info', "debug", "a.debug.b", "a.info.b"] 38 for routing_key in routing_keys: 39 message = 'hello world! {}'.format(routing_key) 40 print('生产者发送的消息为:{}'.format(message)) 41 rabbit_producer.send_msg(routing_key, message)
topic 消费者1 --> 实现fanout交换机:
1 """ 2 当topic交换机使用#绑定键绑定队列时,此时topic交换机就会将消息广播到所有的队列当中, 3 不管消息的路由秘钥如何,此时topic交换机的效果等同于fanout:发送所有消息都会接受到 4 """ 5 import pika 6 import uuid 7 8 9 class RabbitConsumer(object): 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='task_queue' 19 ) 20 21 # 声明交换机,其类型为topic 22 self.channel.exchange_declare( 23 exchange='logs_topic', exchange_type='topic', durable=True 24 ) 25 26 # 将队列与该交换机进行绑定 27 routing_keys = ['#'] # 使用#绑定键时,它将接受所有的消息,同fanout效果一样. 28 for routing_key in routing_keys: 29 self.channel.queue_bind( 30 exchange='logs_topic', queue='task_queue', routing_key=routing_key 31 ) 32 33 def call_back(self, channel, method, properties, body): 34 """ 35 消费者对消息进行确认,防止消费者挂掉 36 :param channel: 37 :param method: 38 :param properties: 39 :param body: 40 :return: 41 """ 42 self.channel.basic_ack(delivery_tag=method.delivery_tag) 43 print('接收到的消息为:{}'.format(str(body))) 44 45 def receive_msg(self): 46 print('开始接受消息...') 47 self.channel.basic_qos(prefetch_count=1) 48 self.channel.basic_consume( 49 consumer_callback=self.call_back, 50 queue='task_queue', 51 no_ack=False, # 消费者对消息进行确认 52 consumer_tag=str(uuid.uuid4()) 53 ) 54 55 def consume(self): 56 self.receive_msg() 57 self.channel.start_consuming() 58 59 60 if __name__ == '__main__': 61 rabbit_consumer = RabbitConsumer() 62 rabbit_consumer.consume()
topic 消费者2 --> 实现direct交换机:
1 """ 2 当topic交换机没有使用*和#匹配符绑定键绑定队列时,此时topic交换机的效果等同于direct, 3 会收到key相匹配的消息 如:info debug 4 """ 5 import pika 6 import uuid 7 8 9 class RabbitConsumer(object): 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='work_queue' 19 ) 20 21 # 22 # 声明交换机,其类型为topic 23 self.channel.exchange_declare( 24 exchange='logs_topic', exchange_type='topic', durable=True 25 ) 26 27 # 将队列与交换机进行绑定 28 routing_keys = ['info', 'debug'] 29 for routing_key in routing_keys: 30 self.channel.queue_bind( 31 exchange='logs_topic', queue='work_queue', routing_key=routing_key 32 ) 33 34 def call_back(self, channel, method, properties, body): 35 """ 36 消费者对消息进行确认,防止消费者挂掉 37 :param channel: 38 :param method: 39 :param properties: 40 :param body: 41 :return: 42 """ 43 self.channel.basic_ack(delivery_tag=method.delivery_tag) 44 print('接收到的消息为:{}'.format(str(body))) 45 46 def receive_msg(self): 47 print('开始接受消息...') 48 self.channel.basic_qos(prefetch_count=1) 49 self.channel.basic_consume( 50 consumer_callback=self.call_back, 51 queue='work_queue', 52 no_ack=False, # 消费者对消息进行确认 53 consumer_tag=str(uuid.uuid4()) 54 ) 55 56 def consume(self): 57 self.receive_msg() 58 self.channel.start_consuming() 59 60 61 if __name__ == '__main__': 62 rabbit_consumer = RabbitConsumer() 63 rabbit_consumer.consume()
topic 消费者3 --> 实现*.x.* 消息匹配:
1 """ 2 匹配任意点分割的单词 生产者发送的:a.debug.b 则匹配了'*.debug.*' 3 生产者发送的:a.info.b 则匹配了'*.info.*' 4 """ 5 import pika 6 import uuid 7 8 9 class RabbitConsumer(object): 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='other_queue' 19 ) 20 21 # 声明交换机,其类型为topic 22 self.channel.exchange_declare( 23 exchange='logs_topic', exchange_type='topic', durable=True 24 ) 25 26 # 将队列与交换机进行绑定 27 routing_keys = ['*.info.*', '*.debug.*', 'dfdf*'] 28 for routing_key in routing_keys: 29 self.channel.queue_bind( 30 exchange='logs_topic', queue='other_queue', routing_key=routing_key 31 ) 32 33 def call_back(self, channel, method, properties, body): 34 """ 35 消费者对消息进行确认,防止消费者挂掉 36 :param channel: 37 :param method: 38 :param properties: 39 :param body: 40 :return: 41 """ 42 self.channel.basic_ack(delivery_tag=method.delivery_tag) 43 print('接收到的消息为:{}'.format(str(body))) 44 45 def receive_msg(self): 46 print('开始接受消息...') 47 self.channel.basic_qos(prefetch_count=1) 48 self.channel.basic_consume( 49 consumer_callback=self.call_back, 50 queue='other_queue', 51 no_ack=False, # 消费者对消息进行确认 52 consumer_tag=str(uuid.uuid4()) 53 ) 54 55 def consume(self): 56 self.receive_msg() 57 self.channel.start_consuming() 58 59 60 if __name__ == '__main__': 61 rabbit_consumer = RabbitConsumer() 62 rabbit_consumer.consume()
topic消费者执行结果:
消费者1:
消费者2:
消费者3: