zoukankan      html  css  js  c++  java
  • RabbitMQ

    1、RabbitMQ

      RabbitMQ 是由 LShift 提供的一个Advanced Message Quenuing Protocol(AMQP)的开源实现,由以高性能,健壮性记忆可伸缩性出名的rlang 写成,因此也继承了这些优点

      很成熟,久经考验,应用广泛

      文档详细,客户端丰富,几乎常用语言都有RabbitMQ的开发库

    2、安装

      http://www.rabbitmq.com/install-rpm.html

      选择RPM 包下载,选择对应平台,本次装到CentOS6 上

      由于使用了erlang 语言开发,所以需要erlang包,该下载也提供了连接

      

      

      安装成功,查看安装的文件:

        

        

      配置:http://www.rabbitmq.com/configure.html#config-location

      环境变量:

      • 使用系统环境变量,如果没有使用rabbitmq-env.conf 中定义环境变量,否则 使用缺省值
      • RABBITMQ_NODE_IP_ADDRESS the  empty string, meaning that it should bind 同allnetwork interfaces 
      • RABBITMQ_NODE_PORT 5672
      • RABBITMQ_NODE_PORT RABBITMQ_NODE_PORT + 20000 内部节点和客户端 工具通信用
      • RABBITMQ_CONFIG_FILE 配置文件 路径默认为 /etc/rabbitmq/rabbitmq 

        15672:http端使用

          环境变量文件,可以不配置

      工作特性配置文件

        rabbitmq.config 配置文件

        3.7支持新旧两种格式配置文件格式

        1、erlang配置 文件格式,为了兼容继续采用

          

        2、sysctl格式,如果不需要兼容,RabbitMQ 鼓励使用

           

        这个文件也可以不配置

       插件管理

        列出所有可用插件

          # tabbitmq-plugins list

        启动WEB 管理插件

          # tabbitmq-plugins   enable  rabbitmq_management

        

         启动服务:

          # service  rabbitmq-server  start

          启动中,可能出现下面的错误:

          

         就是这个文件的权限问题,修改属主,属组即可

          

                   ||

                   ||

          

         服务启动成功:

             

         开始登陆 WEB 界面 ip:port

           

       使用guest/guest 只能本地登录,远程登录会报错

          

        rabbitmqctl

           

          

       用户管理:

        添加用户:

        删除用户:

           更改密码:

         设置权限Tags,其实就是分配组:

            

         设置rab 用户

              

             tag 的意义如下:

              administrator 可以管理用户,权限,虚拟主机

            

           基本信息:

        

            虚拟主机:

           /  为缺省虚拟机

           

         缺省虚拟缀,默认只能是guest 用户在本机连接,上图新建的用户rab 默认无法访问任何虚拟主机

        Python 库

           Pika 是纯Python实现的额支持AMQP协议的库

            $ pip  install pika

        RabbitMQ 工作原理及应用

           https://www.rabbitmq.com/getstarted.html

         

         

         上图,列出了RabbitMQ 的使用模式,学习上面的模式,对理解所有小写队列都很重要。

       名词解释

    名词 说明
    Server         服务器,接受客户端连接,实现消息队列即路由功能的继承(服务),也称为消息代理,注意,客户端包括生产者好消费者
    Connection   网络物理连接
    Channel 一个连接允许多个客户端连接
    Exchange

    交换器,接受生产者发来的消息,决定如何路由 给 服务器中的队列,常用的类型:direct(point-to -point)

    topic(publish-subscribe) ,  fanout(multicast)  {fanout:扇出,subcribe:订阅}

    Message 消息
    Message Queue 消息队列,数据的存储载体
    Bind 绑定,建立消息队列和交换器之间的关系,也就是说就交换器拿到数据,把什么样的数据传送给哪个队列
    Virtual Host 虚拟主机,一批交换机,消息队列和相关对象的集合,为了多用户互不干扰,使用虚拟主机分组交换机,消息队列
    Topic 主题,话题
    Broker 可等价位Server

      

      

    1、队列:  

        这种模式就是简单的生产者消费者模式,消息队列就是一个FIFO 的队列

          

         生产者 send.py   消费者 receive.py

        官方例子:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

         参照官网例子,写一个程序:

     1 # send.py
     2 import pika
     3 
     4 # 匹配连接参数
     5 params = pika.ConnectionParameters('192.168.112.111')
     6 # 建立连接
     7 connection = pika.BlockingConnection(params)
     8 
     9 
    10 with connection:
    11     # 建立通道
    12     channel = connection.channel()
    13     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    14     channel.queue_declare(queue='hello')
    15 
    16     channel.basic_publish(
    17         exchange= "" ,# 使用缺省exchange
    18         routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
    19         body= 'hello world'  # 消息
    20     )
    21     print('===== end ========')

        结果:

        访问被拒绝,还是权限问题,原因是guest 用户只能访问 localhost 上的 /  缺省虚拟主机 

         解决办法:

          缺省虚拟主机,默认只能在本机访问,不要修改为远程访问, 是安全的考虑

          因此,在Admin中Virtual hosts 中,新建一个虚拟主机test

          注意:新建的test 虚拟主机 的User 是谁,本次是 rab 用户

       

         在ConnectionParameters 中并没有用户名,密码填写的参数,它使用参数credentials 传入,这需要构建一个pika.credentials.Crendentuals对象

         测试:(修改后)

     1 # send.py
     2 import pika
     3 
     4 
     5 credential = pika.PlainCredentials('rab', '123456')
     6 # 匹配连接参数
     7 params = pika.ConnectionParameters(
     8     '192.168.112.111', 5672, # 地址,端口
     9     'test', # 虚拟主机
    10     credential # 用户名,密码
    11 )
    12 # 建立连接
    13 connection = pika.BlockingConnection(params)
    14 
    15 
    16 with connection:
    17     # 建立通道
    18     channel = connection.channel()
    19     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    20     channel.queue_declare(queue='hello')
    21 
    22     channel.basic_publish(
    23         exchange= "" ,# 使用缺省exchange
    24         routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
    25         body= 'hello world'  # 消息
    26     )
    27     print('===== end ========')

        结果:

    ===== end ========

      

         

      URLParameters, 也可以使用URL 创建参数 (直接替换就可以) 

          

    1 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')

       queue_declare 声明一个queue, 有必要的话,创建它

      basc_publish  exchange 为空就使用缺省exchange ,如果找不到指定的exchange 就抛异常

      使用缺省exchange, 就必须指定routing_key  使用它找到queue

      生产者代码做一些改动,使他能连接send message。

     1 # send.py
     2 import pika
     3 
     4 
     5 credential = pika.PlainCredentials('rab', '123456')
     6 # 匹配连接参数
     7 # params = pika.ConnectionParameters(
     8 #     '192.168.112.111', 5672, # 地址,端口
     9 #     'test', # 虚拟主机
    10 #     credential # 用户名,密码
    11 # )
    12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
    13 # 建立连接
    14 connection = pika.BlockingConnection(params2)
    15 
    16 
    17 with connection:
    18     # 建立通道
    19     channel = connection.channel()
    20     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    21     channel.queue_declare(queue='hello')
    22 
    23     for _ in range(40):
    24         channel.basic_publish(
    25             exchange= "" ,# 使用缺省exchange
    26             routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
    27             body= 'hello world'  # 消息
    28         )
    29     print('===== end ========')

      结果:(加上之前的 一共42条了)

         receive.py 消费者代码

        单个消费者

     1 # receive.py
     2 import  pika
     3 
     4 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
     5 
     6 connection = pika.BlockingConnection(params2)
     7 channel = connection.channel()
     8 
     9 with connection:
    10     msg = channel.basic_get('hello',True) # 获取一个消息, True, 获取不到,不会阻塞
    11     print(msg)
    12     method, props, body = msg
    13     print(method)
    14     print(props)
    15     if body:
    16         print(body)
    17     else:
    18         print('empty')

         结果:

    1 (<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=119', 'redelivered=False', 'routing_key=hello'])>, <BasicProperties>, b'hello world')
    2 <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=119', 'redelivered=False', 'routing_key=hello'])>
    3 <BasicProperties>
    4 b'hello world'

        如上:每执行一次,消费一条

        批量消费消息:

     1 # receive.py
     2 import  pika
     3 
     4 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
     5 connection = pika.BlockingConnection(params2)
     6 channel = connection.channel()
     7 
     8 def callback(channel, method, properties, body):
     9     print(body)
    10 
    11 with connection:
    12     channel.basic_consume(
    13         callback, # 消费回调函数
    14         queue='hello', # 队列名
    15         no_ack=True  # 不回应,不阻塞
    16     )
    17     channel.start_consuming()

         将hello 队列中的消息,都消费完,并一直处于消费状态 

       而这个Ack是TCP协议中的Ack此Ack的回复不关心消费者是否对接收到的数据进行了处理,当然也不关心处理数据所需要的耗时

    2、工作队列:

         

       继续使用队列模式的生产者消费者代码,启动2 个消费者

       观察结果,可以看到,2个消费者是交替拿到不同的消息

      这种工作 模式是一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它

      从结果知道,使用的是轮询方式拿走数据的

      注意:虽然上面的图中没有画出exchange, 用到缺省的exchange  

       测试:代码

     1 # send.py
     2 import pika
     3 
     4 
     5 # credential = pika.PlainCredentials('rab', '123456')
     6 # 匹配连接参数
     7 # params = pika.ConnectionParameters(
     8 #     '192.168.112.111', 5672, # 地址,端口
     9 #     'test', # 虚拟主机
    10 #     credential # 用户名,密码
    11 # )
    12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
    13 # 建立连接
    14 connection = pika.BlockingConnection(params2)
    15 
    16 
    17 with connection:
    18     # 建立通道
    19     channel = connection.channel()
    20     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    21     channel.queue_declare(queue='hello')
    22 
    23     for i in range(40):
    24         channel.basic_publish(
    25             exchange= "" ,# 使用缺省exchange
    26             routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
    27             body= '{}  hello world'.format(i)  # 消息
    28         )
    29     print('===== end ========')
    30 
    31 
    32 
    33 
    34 
    35 
    36 # receive.py
    37 import  pika
    38 import  time
    39 
    40 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
    41 connection = pika.BlockingConnection(params2)
    42 channel = connection.channel()
    43 
    44 def callback(channel, method, properties, body):
    45     print(body)
    46 
    47 time.sleep(1)
    48 with connection:
    49     channel.basic_consume(
    50         callback, # 消费回调函数
    51         queue='hello', # 队列名
    52         no_ack=True  # 不回应,不阻塞
    53     )
    54     channel.start_consuming()

         两个消费者结果:

              

        三个消费者:

        

     3、发布,订阅模式 

      Publish / Subscribe 发布和订阅

      订阅者和消费者之间还是有一个 exchange

      也就是,每个人 拿到的数据是一样的。

            

       当前模式的exchange 的type是fanout, 就是一对多,即广播模式

      注意:同一个queueu的消息只能被消费一次,所以这里使用了多个queue,相当于为了保证不同的消费者拿到同样的数据,每一个消费者都应该有自己的queueu 

    # 生成一个交换机
    channel.exchange_declare(
        exchange='logs', # 新交换机
        exchange_type='fanout' # 交换机的模式:广播
    )

      生产者使用广播模式,在test虚拟主机下构建了一个logs 交换机

       至于queue, 可以由生产者创建,也可以由 消费者创建

      本次采用使用消费者端创建爱你,生产者 把数据发往交换机logs, 采用了fanout  ,然后将数据通过交换机发往已经绑定到此交换机的所有queue。

          

      绑定 Bingding,建立exchange 和queue之间的联系    

     1 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
     2 connection = pika.BlockingConnection(params2)
     3 channel = connection.channel()
     4 
     5 # 创建随机名称的queueu
     6 # result = channel.queue_declare() # s生成一个随机名称的queue
     7 # result = channel.queue_declare(exclusive=True) # 生成一个随机名称的queue,并在断开连接时删除queue
     8 
     9 # 生成queue
    10 q1 = channel.queue_declare(exclusive=True)
    11 q2 = channel.queue_declare(exclusive=True)
    12 q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称
    13 q2name = q2.method.queue # 可以通过result.method.queue 查看随机名称
    14 
    15 print(q1name, q2name)
    16 
    17 # 绑定
    18 channel.queue_bind(exchange='blogs', queue=q1name)
    19 channel.queue_bind(exchange='blogs', queue=q2name)

        生产者代码:

        注意观察交换机和队列

     1 # send.py
     2 import  pika
     3 import  time
     4 
     5 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
     6 connection = pika.BlockingConnection(params2)
     7 channel = connection.channel()
     8 
     9 with connection:
    10     # 建立通道
    11     channel = connection.channel()
    12     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    13     channel.exchange_declare(
    14         exchange =  'logs', #新交换机
    15         exchange_type = 'fanout'
    16         )
    17 
    18     for i in range(100):
    19         pub = channel.basic_publish(
    20             exchange= "logs" ,# 指定exchange
    21             routing_key='', # 广播模式不用指定
    22             body= '{:02}  hello world'.format(i)  # 消息
    23         )
    24         print(pub, '============')
    25 
    26     print('==== end =====')

        如果先开启生产者,没有消费者,直接dropped 掉,但是生成了exchange

        消费者代码

         构建queue 并绑定到test虚拟主机的logs交换机上

     1 # receive.py
     2 import  pika
     3 import  time
     4 
     5 import  pika
     6 import  time
     7 
     8 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
     9 connection = pika.BlockingConnection(params2)
    10 channel = connection.channel()
    11 channel.exchange_declare('logs', 'fanout')
    12 
    13 # 生成一个随机名的 queue
    14 q1 = channel.queue_declare(exclusive=True)
    15 q2 = channel.queue_declare(exclusive=True)
    16 print(1, q1, type(q1))
    17 print(2, q1.method, type(q1.method))
    18 q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称
    19 q2name = q2.method.queue # 可以通过result.method.queue 查看随机名称
    20 print(3, q1name)
    21 
    22 # 绑定
    23 channel.queue_bind(queue=q1name, exchange='logs')
    24 channel.queue_bind(queue=q2name, exchange='logs')
    25 
    26 def callback(channel, method, properties, body):
    27     print("{}
    {}".format(channel, method))
    28     print(body)
    29     print('=======================================')
    30 
    31 
    32 with connection:
    33     channel.basic_consume(
    34         callback, # 消费回调函数
    35         queue='q1name', # 队列名
    36         no_ack=True  # 不回应,不阻塞
    37     )
    38     channel.basic_consume(
    39         callback, # 消费回调函数
    40         queue='q2name', # 队列名
    41         no_ack=True  # 不回应,不阻塞
    42     )
    43 
    44     channel.start_consuming()

         看到首先启动了 消费者,创建了exchange,并绑定了queue

        

         

         打印:(部分打印)

    1 1 <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-DfOvsPNZoEK6rWZaYbj8zw'])>"])> <class 'pika.frame.Method'>
    2 2 <Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-DfOvsPNZoEK6rWZaYbj8zw'])> <class 'pika.spec.Queue.DeclareOk'>
    3 3 amq.gen-DfOvsPNZoEK6rWZaYbj8zw
    4 <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('192.168.112.1', 58780)->('192.168.112.111', 5672) params=<URLParameters host=192.168.112.111 port=5672 virtual_host=test ssl=False>>>>
    5 <Basic.Deliver(['consumer_tag=ctag1.4cb1fdc9f859486cac3f1cfa01e476f8', 'delivery_tag=1', 'exchange=logs', 'redelivered=False', 'routing_key='])>
    6 b'00  hello world'

           

         

      q1 = channel.queue_declare(exclusive=True)
      q2 = channel.queue_declare(exclusive=True)

        如果先开启生产者,后开启消费者,部分数据会丢失

       部分数据丢失,是因为,exchange 收到了数据,没有queue接受,所以,exchange 丢弃了这些数据。 

     4、路由Routing

         

         路由其实就是生产者的数据经过exchange 的时候,通过匹配规则,决定数据的去向

        消费者

        生产者代码

        交换机类型为 direct ,指定路由的key   

     1 # send.py
     2 import  pika
     3 import  time
     4 import random
     5 
     6 exchange = 'color'
     7 colors = ('orange', 'black', 'green')
     8 
     9 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
    10 connection = pika.BlockingConnection(params2)
    11 channel = connection.channel()
    12 
    13 with connection:
    14     # 建立通道
    15     channel = connection.channel()
    16     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    17     channel.exchange_declare(
    18         exchange = exchange, #新交换机
    19         exchange_type = 'direct'
    20         )
    21 
    22     for i in range(40):
    23         rk = colors[random.randint(0,2)]
    24         msg = "routingKey {} --- data {:02}".format(rk, i)
    25         pub = channel.basic_publish(
    26             exchange= exchange ,# 指定exchange
    27             routing_key=rk, # 广播模式不用指定
    28             body= msg  # 消息
    29         )
    30         print(rk, msg, '============')
    31         time.sleep(0.5)
    32 
    33     print('==== end =====')

         消费者代码:

     1 # receive.py
     2 import  pika
     3 import  time
     4 
     5 exchange = 'color'
     6 colors = ('orange', 'black', 'green')
     7 
     8 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
     9 connection = pika.BlockingConnection(params2)
    10 channel = connection.channel()
    11 channel.exchange_declare(exchange, 'direct')
    12 
    13 # 生成一个随机名的 queue
    14 q1 = channel.queue_declare(exclusive=True)
    15 q2 = channel.queue_declare(exclusive=True)
    16 
    17 name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
    18 name2 = q2.method.queue # 可以通过result.method.queue 查看随机名称
    19 
    20 
    21 # 绑定(一定指定 routing_key)
    22 channel.queue_bind(queue=name1, exchange=exchange, routing_key=colors[0])
    23 channel.queue_bind(name2, exchange,colors[1])
    24 channel.queue_bind(queue=name2, exchange=exchange,routing_key=colors[0])
    25 
    26 def callback(channel, method, properties, body):
    27     print("{}
    {}".format(channel, method))
    28     print(body)
    29     print('=======================================')
    30 
    31 
    32 with connection:
    33     channel.basic_consume(
    34         callback, # 消费回调函数
    35         queue=name2, # 队列名
    36         no_ack=True  # 不回应,不阻塞
    37     )
    38     channel.basic_consume(
    39         callback, # 消费回调函数
    40         queue=name1, # 队列名
    41         no_ack=True  # 不回应,不阻塞
    42     )
    43     channel.start_consuming()

        

          

        如果 routing_key 设置都是一样的:

            

          也就是,将 消费者的 routing_key 比如都设置为black, 则 生产者,只有生产 orange的,才会被消费

          其次,最重要的是, 变成了广播, 类似fanout,都是1对多,但是不同

          因为 fanout时,exchange 不做数据过滤, 1个消息,所有绑定的queue都能拿到一个副本

          direct时,要按照 routing_key 分配数据,上图的black 有2 个 queue设置了,就会把1一个消息分发给2个queue,其他没有black 的,该怎么消费,就怎么消费

    5、Topic 话题

           

       Topic 就是更加高级的路由,支持模式匹配而已

      Topic 的routing_key 必须使用点 号分割的单词组成,最多支持255个字节

      支持使用通配符

      • * 表示严格的一个单词
      • #表示0个 或多个单词

       如果queue绑定的routing_key 只是一个# ,这个queue 其实可以接受所有的消息

      如果没有使用任何 通配符,效果类似direct

      生产者代码:

     1 # send.py
     2 import  pika
     3 import  time
     4 import random
     5 
     6 exchange = 'products'
     7 colors = ('red', 'blue', 'green')
     8 
     9 topics = ('phone.*', '*.red') #  2中话题
    10 product_type = ('phone','pc','tv') # 3中产品
    11 
    12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
    13 connection = pika.BlockingConnection(params2)
    14 channel = connection.channel()
    15 
    16 with connection:
    17     # 建立通道
    18     channel = connection.channel()
    19     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
    20     channel.exchange_declare(
    21         exchange = exchange, #新交换机
    22         exchange_type = 'topic'
    23         )
    24 
    25     for i in range(40):
    26         rk = '{}.{}'.format(
    27             product_type[random.randint(0,2)],
    28             colors[random.randint(0,2)]
    29         )
    30         msg = "routingKey {} --- data {:02}".format(rk, i)
    31         pub = channel.basic_publish(
    32             exchange= exchange ,# 指定exchange
    33             routing_key=rk, # 广播模式不用指定
    34             body= msg  # 消息
    35         )
    36         print(rk, msg, '============')
    37         time.sleep(0.5)
    38 
    39     print('==== end =====')

          消费者代码:

     1 # receive.py
     2 import  pika
     3 import  time
     4 
     5 exchange = 'products'
     6 colors = ('red', 'blue', 'green')
     7 
     8 topics = ('phone.*', '*.red') #  2中话题
     9 product_type = ('phone','pc','tv') # 3中产品
    10 
    11 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
    12 connection = pika.BlockingConnection(params2)
    13 channel = connection.channel()
    14 channel.exchange_declare(exchange, 'topic')
    15 
    16 # 生成一个随机名的 queue
    17 q1 = channel.queue_declare(exclusive=True)
    18 q2 = channel.queue_declare(exclusive=True)
    19 
    20 name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
    21 name2 = q2.method.queue # 可以通过result.method.queue 查看随机名称
    22 
    23 
    24 # 绑定(一定指定 routing_key)
    25 #q1 只收集phone开头的routing-key的消息
    26 channel.queue_bind(queue=name1, exchange=exchange, routing_key=topics[0])
    27 # q2 只收集red结尾的
    28 channel.queue_bind(name2, exchange,topics[1])
    29 
    30 def callback(channel, method, properties, body):
    31     print("{}
    {}".format(channel, method))
    32     print(body)
    33     print('=======================================')
    34 
    35 
    36 with connection:
    37     channel.basic_consume(
    38         callback, # 消费回调函数
    39         queue=name2, # 队列名
    40         no_ack=True  # 不回应,不阻塞
    41     )
    42     channel.basic_consume(
    43         callback, # 消费回调函数
    44         queue=name1, # 队列名
    45         no_ack=True  # 不回应,不阻塞
    46     )
    47     channel.start_consuming()

        

         观察者消费者拿到的数据,注意观察phone.red 的数据出现的次数

        由此,可以知道,交换机在路由消息的时候, 只要和queue的routing_key 匹配,就把消息发给该queue

      RPC远程过程调用

        很少使用,有更好的RPC 框架 

    消息队列的作用:

      1、系统间解耦

      2、解决生产者,消费者速度匹配

      由于稍微上规模的项目都会分层,分模块开发,模块间或系统间,尽量不要直接耦合,需要开放公共接口提供给别的模块使用,而调用可能触发并发问题,为了缓冲和解耦,往往使用中间技术

      

  • 相关阅读:
    Django -- 10.Django和Ajax
    Django -- 9.模型层(2)
    Django -- 8.模型层(1)
    Django -- 7.模板层
    Django -- 6.视图层
    Django -- 5.路由层(URLconf)_基于Django2
    Django -- 4.Django简介
    Django -- 3.web框架
    Delphi中Chrome Chromium、Cef3学习笔记(四)
    Delphi中Chrome Chromium、Cef3学习笔记(三)
  • 原文地址:https://www.cnblogs.com/JerryZao/p/10092202.html
Copyright © 2011-2022 走看看