zoukankan      html  css  js  c++  java
  • RabbitMQ中交换机的消息分发机制

    RabbitMQ是一个消息代理,它接受和转发消息,是一个由 Erlang 语言开发的遵循AMQP协议的开源实现。在RabbitMQ中生产者不会将消息直接发送到队列当中,而是将消息直接发送到交换机(exchange),交换机用来接受生产者发送的消息并将这些消息发送给绑定的队列,即:生产者-->交换机-->队列。

    在RabbitMQ中最主要的三种交换机:1. fanout(广播交换机)  2. direct(直连交换机)  3. topic(话题交换机)

    1. fanout(广播交换机)

     fanout会将接受到的所有消息广播到它所绑定的所有队列当中(每个消费者都会收到所有的消息),对于广播交换机,消息路由键routing_key和队列绑定键routing_key的作用都会被忽略。

    fanout生产者:

     1 import pika
     2 
     3 
     4 class RabbitProducer(object):
     5     """
     6     与RabbitMq服务器建立连接
     7     """
     8 
     9     def __init__(self):
    10         self.conn = pika.BlockingConnection(
    11             pika.ConnectionParameters(host='localhost', port=5672)
    12         )
    13         self.channel = self.conn.channel()
    14 
    15         # 声明一个exchange交换机,交换机的类型为fanout广播.
    16         self.channel.exchange_declare(
    17             exchange='fanout_exchange', exchange_type='fanout', durable=True
    18         )
    19 
    20     def send_msg(self, message):
    21         """
    22         routing_key:绑定的key
    23         :param message:
    24         :return:
    25         """
    26         self.channel.basic_publish(
    27             exchange='fanout_exchange',
    28             routing_key='',  # 因为exchange的类型为fanout,所以routing_key的数值在这里将被忽略
    29             body=message,
    30             properties=pika.BasicProperties(
    31                 delivery_mode=2,
    32                 # 消息进行持久化(防止服务器挂掉.)===> 如果没有queue绑定到这个exchange交换机,这个参数是没有的.
    33             ))
    34 
    35     def close(self):
    36         self.conn.close()
    37 
    38 
    39 if __name__ == "__main__":
    40     rabbit_producer = RabbitProducer()
    41     for i in range(10):
    42         message = 'hello world {}!'.format(i)
    43         rabbit_producer.send_msg(message)
    View Code

    消费者consumer1:

     1 import pika
     2 import uuid
     3 
     4 
     5 class RabbitConsumer(object):
     6     """
     7     fanout 消费者1
     8     """
     9 
    10     def __init__(self):
    11         self.conn = pika.BlockingConnection(
    12             pika.ConnectionParameters(host='localhost', port=5672)
    13         )
    14         self.channel = self.conn.channel()
    15 
    16         # 声明一个队列queue_consumer1,并进行持久化(防止服务器挂掉),exclusive设置为false
    17         self.channel.queue_declare(
    18             exclusive=False, durable=True, queue='queue_consumer1'
    19         )
    20 
    21         # 声明一个exhange交换机,其类型为fanout广播类型  与生产者的交换机一致
    22         self.channel.exchange_declare(
    23             exchange='fanout_exchange', exchange_type='fanout', durable=True
    24         )
    25 
    26         # 将队列queue_consumer1与该exchange交换机进行绑定
    27         self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer1')
    28 
    29     def call_back(self, method, body):
    30         """
    31         消费者对消息进行确认,防止消费者挂掉.
    32         :param method:
    33         :param body:
    34         :return:
    35         """
    36         self.channel.basic_ack(delivery_tag=method.delivery_tag)
    37         print('接收到的消息为:{}'.format(str(body)))
    38 
    39     def receive_msg(self):
    40         print('consumer1开始接受消息...')
    41         # 当上一条消息未确认时,会告知RabbitMQ不要再发送消息给这个消费者了 可以控制流量
    42         self.channel.basic_qos(prefetch_count=1)
    43         self.channel.basic_consume(
    44             consumer_callback=self.call_back,
    45             queue='queue_consumer1',
    46             no_ack=False,  # 消费者对消息进行确认,防止消费者挂掉
    47             consumer_tag=str(uuid.uuid4())
    48         )
    49 
    50     def consume(self):
    51         self.receive_msg()
    52         self.channel.start_consuming()
    53 
    54 
    55 if __name__ == '__main__':
    56     rabbit_consumer = RabbitConsumer()
    57     rabbit_consumer.consume()
    View Code

    消费者consumer2:

     1 import pika
     2 import uuid
     3 
     4 
     5 class RabbitConsumer(object):
     6     def __init__(self):
     7         self.conn = pika.BlockingConnection(
     8             pika.ConnectionParameters(host='localhost', port=5672)
     9         )
    10         self.channel = self.conn.channel()
    11 
    12         # 声明一个队列queue_consumer2,并进行持久化(防止服务器挂掉),exclusive设置为false
    13         self.channel.queue_declare(
    14             exclusive=False, durable=True, queue='queue_consumer2'
    15         )
    16 
    17         # T声明一个exhange交换机,其类型为fanout广播类型
    18         self.channel.exchange_declare(
    19             exchange='fanout_exchange', exchange_type='fanout', durable=True
    20         )
    21 
    22         # 将队列queue_consumer2与该exchange交换机进行绑定
    23         self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer2')
    24 
    25     def call_back(self, method, body):
    26         """
    27         消费者对消息进行确认,防止消费者挂掉.
    28         :param method:
    29         :param body:
    30         :return:
    31         """
    32         self.channel.basic_ack(delivery_tag=method.delivery_tag)
    33         print('接收到的消息为:{}'.format(str(body)))
    34 
    35     def receive_msg(self):
    36         print('consumer2开始接受消息...')
    37         self.channel.basic_consume(
    38             consumer_callback=self.call_back,
    39             queue='queue_consumer2',
    40             no_ack=False,
    41             consumer_tag=str(uuid.uuid4())
    42         )
    43 
    44     def consume(self):
    45         self.receive_msg()
    46         self.channel.start_consuming()
    47 
    48 
    49 if __name__ == '__main__':
    50     rabbit_consumer = RabbitConsumer()
    51     rabbit_consumer.consume()
    View Code

    fanout会将接受到的所有消息广播到消费者consumer1和消费者consumer2,交换机的缺陷:它只能无意识的播放,不够灵活地控制消息广播给指定的消费者
     

     2. direct(直连交换机)

    对于direct,根据绑定键判定应该将数据发送至哪个队列,消息进入队列,其绑定秘钥(routing_key)与消息的路由秘钥要完全匹配,当exchange使用相同的绑定秘钥(routing_key)去绑定多个队列也是合法的,在这种情况下direct exchange的效果等同于fanout exchange,交换机会将消息广播到所有匹配的队列当中。

    direct生产者:
     1 import pika
     2 
     3 
     4 class RabbitProducer(object):
     5     """
     6     与RabbitMq服务器建立连接
     7     """
     8 
     9     def __init__(self):
    10         self.conn = pika.BlockingConnection(
    11             pika.ConnectionParameters(host='localhost', port=5672)
    12         )
    13         self.channel = self.conn.channel()
    14 
    15         # 声明一个exchange交换机,交换机的类型为direct
    16         self.channel.exchange_declare(
    17             exchange='direct_exchange', exchange_type='direct', durable=True
    18         )
    19 
    20     def send_msg(self, routing_key, message):
    21         """
    22         :param routing_key: 消息的路由键 本例中为routing_info
    23         :param message: 生成者发送的消息
    24         :return:
    25         """
    26         self.channel.basic_publish(
    27             exchange='direct_exchange',
    28             routing_key=routing_key,
    29             body=message,
    30             properties=pika.BasicProperties(
    31                 delivery_mode=2,
    32                 # 消息进行持久化(防止服务器挂掉.)===> 如果没有queue绑定到这个exchange交换机,这个参数是没有的.
    33             ))
    34 
    35     def close(self):
    36         self.conn.close()
    37 
    38 
    39 if __name__ == "__main__":
    40     rabbit_producer = RabbitProducer()
    41     routing_key = 'routing_info'
    42     for i in range(10):
    43         message = 'hello world {}!'.format(i)
    44         print('生产者发送的消息为:{}'.format(message))
    45         rabbit_producer.send_msg(routing_key, message)
    View Code
    direct消费者:
     1 import pika
     2 import uuid
     3 
     4 
     5 class RabbitConsumer(object):
     6     """
     7     消费者(订阅者)
     8     """
     9 
    10     def __init__(self):
    11         self.conn = pika.BlockingConnection(
    12             pika.ConnectionParameters(host='localhost', port=5672)
    13         )
    14         self.channel = self.conn.channel()
    15 
    16         # 消息持久化
    17         self.channel.queue_declare(
    18             exclusive=False, durable=True, queue='task_queue'
    19         )
    20 
    21         # 交换机类型为direct.
    22         self.channel.exchange_declare(
    23             exchange='direct_exchange', exchange_type='direct', durable=True
    24         )
    25 
    26         # 将队列与该exchange交换机进行绑定
    27         routing_keys = ['routing_info', 'aaa']
    28         for routing_key in routing_keys:
    29             self.channel.queue_bind(
    30                 exchange='direct_exchange', queue='task_queue', routing_key=routing_key
    31             )  # 如果生产者发生消息的routing_key与消费者绑定队列的routing_key相同则成功发送
    32 
    33     def call_back(self, channel, method, properties, body):
    34         """
    35         消费者对消息进行确认,防止消费者挂掉
    36         :param channel:
    37         :param method:
    38         :param properties:
    39         :param body:
    40         :return:
    41         """
    42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
    43         print('接收到的消息为:{}'.format(str(body)))
    44 
    45     def receive_msg(self):
    46         print('开始接受消息...')
    47         self.channel.basic_qos(prefetch_count=1)  # TODO 告诉RabbitMQ,不要向我发送新的消息.
    48         self.channel.basic_consume(
    49             consumer_callback=self.call_back,
    50             queue='task_queue',
    51             no_ack=False,
    52             consumer_tag=str(uuid.uuid4())
    53         )
    54 
    55     def consume(self):
    56         self.receive_msg()
    57         self.channel.start_consuming()
    58 
    59 
    60 if __name__ == '__main__':
    61     rabbit_consumer = RabbitConsumer()
    62     rabbit_consumer.consume()
    View Code

    direct直连交换机相当于是fanout的升级版,当消费者的队列
    绑定的秘钥routing_key与生产者的routing_key相同时,消费者就会收到消息;当所有消费者的队列所绑定的routing_key都一样且与生产者相同时,就相当于fanout交换机
    
    

      3. topic(话题交换机)

    direct(直连交换机)虽然相当于fanout的升级版,但它仍然有局限性,它不能根据多个标准进行路由;topic(话题交换机)可以很好地解决这一问题:
    (1) 如果消息的路由秘钥与队列的绑定秘钥符合匹配规则,topic就会将消息发送到相应的队列当中
    (2) 对于绑定键(routing_key)有两个特殊的情况: * (星号)可以代替一个单词,#(散列)可以替代零个或多个单词
    (3) 对于发送到topic交换机消息的routing_key如果包含特殊字符,只能是由"."分割的单词表,如("zhangsan.lisi")

    topic 生产者:
     1 import pika
     2 
     3 
     4 class RabbitProducer(object):
     5     def __init__(self):
     6         self.conn = pika.BlockingConnection(
     7             pika.ConnectionParameters(host='localhost', port=5672)
     8         )
     9         self.channel = self.conn.channel()
    10 
    11         # 声明交换机,交换机的类型为topic
    12         self.channel.exchange_declare(
    13             exchange='logs_topic', exchange_type='topic', durable=True
    14         )
    15 
    16     def send_msg(self, routing_key, message):
    17         """
    18         :param routing_key: 消息的路由键
    19         :param message: 生成者发送的消息
    20         :return:
    21         """
    22         self.channel.basic_publish(
    23             exchange='logs_topic',
    24             routing_key=routing_key,
    25             body=message,
    26             properties=pika.BasicProperties(
    27                 delivery_mode=2,
    28                 # 消息进行持久化
    29             ))
    30 
    31     def close(self):
    32         self.conn.close()
    33 
    34 
    35 if __name__ == "__main__":
    36     rabbit_producer = RabbitProducer()
    37     routing_keys = ['info', "debug", "a.debug.b", "a.info.b"]
    38     for routing_key in routing_keys:
    39         message = 'hello world! {}'.format(routing_key)
    40         print('生产者发送的消息为:{}'.format(message))
    41         rabbit_producer.send_msg(routing_key, message)
    View Code
    topic 消费者1 --> 实现fanout交换机:
     1 """
     2 当topic交换机使用#绑定键绑定队列时,此时topic交换机就会将消息广播到所有的队列当中,
     3 不管消息的路由秘钥如何,此时topic交换机的效果等同于fanout:发送所有消息都会接受到
     4 """
     5 import pika
     6 import uuid
     7 
     8 
     9 class RabbitConsumer(object):
    10     def __init__(self):
    11         self.conn = pika.BlockingConnection(
    12             pika.ConnectionParameters(host='localhost', port=5672)
    13         )
    14         self.channel = self.conn.channel()
    15 
    16         # 消息持久化
    17         self.channel.queue_declare(
    18             exclusive=False, durable=True, queue='task_queue'
    19         )
    20 
    21         # 声明交换机,其类型为topic
    22         self.channel.exchange_declare(
    23             exchange='logs_topic', exchange_type='topic', durable=True
    24         )
    25 
    26         # 将队列与该交换机进行绑定
    27         routing_keys = ['#']  # 使用#绑定键时,它将接受所有的消息,同fanout效果一样.
    28         for routing_key in routing_keys:
    29             self.channel.queue_bind(
    30                 exchange='logs_topic', queue='task_queue', routing_key=routing_key
    31             )
    32 
    33     def call_back(self, channel, method, properties, body):
    34         """
    35         消费者对消息进行确认,防止消费者挂掉
    36         :param channel:
    37         :param method:
    38         :param properties:
    39         :param body:
    40         :return:
    41         """
    42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
    43         print('接收到的消息为:{}'.format(str(body)))
    44 
    45     def receive_msg(self):
    46         print('开始接受消息...')
    47         self.channel.basic_qos(prefetch_count=1)
    48         self.channel.basic_consume(
    49             consumer_callback=self.call_back,
    50             queue='task_queue',
    51             no_ack=False,  # 消费者对消息进行确认
    52             consumer_tag=str(uuid.uuid4())
    53         )
    54 
    55     def consume(self):
    56         self.receive_msg()
    57         self.channel.start_consuming()
    58 
    59 
    60 if __name__ == '__main__':
    61     rabbit_consumer = RabbitConsumer()
    62     rabbit_consumer.consume()
    View Code
    topic 消费者2 --> 实现direct交换机:
     1 """
     2 当topic交换机没有使用*和#匹配符绑定键绑定队列时,此时topic交换机的效果等同于direct,
     3 会收到key相匹配的消息  如:info debug
     4 """
     5 import pika
     6 import uuid
     7 
     8 
     9 class RabbitConsumer(object):
    10     def __init__(self):
    11         self.conn = pika.BlockingConnection(
    12             pika.ConnectionParameters(host='localhost', port=5672)
    13         )
    14         self.channel = self.conn.channel()
    15 
    16         # 消息持久化
    17         self.channel.queue_declare(
    18             exclusive=False, durable=True, queue='work_queue'
    19         )
    20 
    21         #
    22         # 声明交换机,其类型为topic
    23         self.channel.exchange_declare(
    24             exchange='logs_topic', exchange_type='topic', durable=True
    25         )
    26 
    27         # 将队列与交换机进行绑定
    28         routing_keys = ['info', 'debug']
    29         for routing_key in routing_keys:
    30             self.channel.queue_bind(
    31                 exchange='logs_topic', queue='work_queue', routing_key=routing_key
    32             )
    33 
    34     def call_back(self, channel, method, properties, body):
    35         """
    36         消费者对消息进行确认,防止消费者挂掉
    37         :param channel:
    38         :param method:
    39         :param properties:
    40         :param body:
    41         :return:
    42         """
    43         self.channel.basic_ack(delivery_tag=method.delivery_tag)
    44         print('接收到的消息为:{}'.format(str(body)))
    45 
    46     def receive_msg(self):
    47         print('开始接受消息...')
    48         self.channel.basic_qos(prefetch_count=1)
    49         self.channel.basic_consume(
    50             consumer_callback=self.call_back,
    51             queue='work_queue',
    52             no_ack=False,  # 消费者对消息进行确认
    53             consumer_tag=str(uuid.uuid4())
    54         )
    55 
    56     def consume(self):
    57         self.receive_msg()
    58         self.channel.start_consuming()
    59 
    60 
    61 if __name__ == '__main__':
    62     rabbit_consumer = RabbitConsumer()
    63     rabbit_consumer.consume()
    View Code
    topic 消费者3 --> 实现*.x.* 消息匹配:
     1 """
     2 匹配任意点分割的单词 生产者发送的:a.debug.b 则匹配了'*.debug.*'
     3 生产者发送的:a.info.b 则匹配了'*.info.*'
     4 """
     5 import pika
     6 import uuid
     7 
     8 
     9 class RabbitConsumer(object):
    10     def __init__(self):
    11         self.conn = pika.BlockingConnection(
    12             pika.ConnectionParameters(host='localhost', port=5672)
    13         )
    14         self.channel = self.conn.channel()
    15 
    16         # 消息持久化
    17         self.channel.queue_declare(
    18             exclusive=False, durable=True, queue='other_queue'
    19         )
    20 
    21         # 声明交换机,其类型为topic
    22         self.channel.exchange_declare(
    23             exchange='logs_topic', exchange_type='topic', durable=True
    24         )
    25 
    26         # 将队列与交换机进行绑定
    27         routing_keys = ['*.info.*', '*.debug.*', 'dfdf*']
    28         for routing_key in routing_keys:
    29             self.channel.queue_bind(
    30                 exchange='logs_topic', queue='other_queue', routing_key=routing_key
    31             )
    32 
    33     def call_back(self, channel, method, properties, body):
    34         """
    35         消费者对消息进行确认,防止消费者挂掉
    36         :param channel:
    37         :param method:
    38         :param properties:
    39         :param body:
    40         :return:
    41         """
    42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
    43         print('接收到的消息为:{}'.format(str(body)))
    44 
    45     def receive_msg(self):
    46         print('开始接受消息...')
    47         self.channel.basic_qos(prefetch_count=1)
    48         self.channel.basic_consume(
    49             consumer_callback=self.call_back,
    50             queue='other_queue',
    51             no_ack=False,  # 消费者对消息进行确认
    52             consumer_tag=str(uuid.uuid4())
    53         )
    54 
    55     def consume(self):
    56         self.receive_msg()
    57         self.channel.start_consuming()
    58 
    59 
    60 if __name__ == '__main__':
    61     rabbit_consumer = RabbitConsumer()
    62     rabbit_consumer.consume()
    View Code

    topic消费者执行结果:

    消费者1:

    消费者2:

    消费者3:

  • 相关阅读:
    EMV内核使用中的常见问题
    SM2国密证书合法性验证
    WP8.1中C++的winodws运行时组件位移操作的差异
    [源码]Literacy 快速反射读写对象属性,字段
    Vue 单文件元件 — vTabs
    vue-router路径计算问题
    前端跨域新方案尝试
    Vue 单文件原件 — vCheckBox
    JavaScript 功能类 Url.js
    Vue 学习笔记 — 组件初始化
  • 原文地址:https://www.cnblogs.com/FG123/p/10137383.html
Copyright © 2011-2022 走看看