zoukankan      html  css  js  c++  java
  • RabbitMq

    RabbitMq

          简介:

    RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。

        产生的原因:

     对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:

     1 1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
     2 
     3  2)如何降低发送者和接收者的耦合度?
     4 
     5  3)如何让Priority高的接收者先接到数据?
     6 
     7  4)如何做到load balance?有效均衡接收者的负载?
     8 
     9  5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。
    10 
    11  6)如何做到可扩展,甚至将这个通信模块发到cluster上?
    12 
    13  7)如何保证接收者接收到了完整,正确的数据?
    View Code

        AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP

    复习同步与异步的优缺点;

     1 异步 
     2         优点:解决排队问题
     3         缺点: 不能保证任务被及时的执行 
     4         应用场景:去哪儿,
     5     同步
     6         优点:保证任务被及时的执行 
     7         缺点:排队问题
     8         
     9     大并发
    10         Web nginx  10000-20000 
    11             apache 1000-2000
    12         pv= page visit = 上亿 =10server web cluster集群 
    13         
    14         uv = user visit 
    15         
    16         qps = 每秒查询率
    View Code

    队列的作用:    

             1. 存储消息、数据
             2. 保证消息顺序
             3. 保证数据的交付

    为什么用rabbitmq instead of python queue
    是因为python queue 不能跨进程             http://www.cnblogs.com/liuguniang/p/6858259.html

    多线程的情况下,对于公共数据不做复杂的加锁,而采用队列来存储数据,提高线程间资源的安全性
    应用:生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来
    进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,
    阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

         安装:

      1 windows

                 安装rabbitmq之前先到erlang官网下载     http://www.erlang.org/downloads

     

                 到RabbirMq官网下载  http://www.rabbitmq.com/install-windows.html

     

      2  linux:

     1 #Centos7 安装
     2  
     3 #注意/etc/hosts文件 ip和主机名对应
     4 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
     5 yum install epel-release -y
     6 yum install rabbitmq-server-3.6.10-1.el7.noarch.rpm
     7 rabbitmq-plugins enable rabbitmq_management
     8 cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
     9 systemctl restart rabbitmq-server
    10 systemctl status rabbitmq-server
    11  
    12 #创建用户 授权
    13 rabbitmqctl  add_user alex alex3714
    14 rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"
    安装过程中根据提示安装依赖包erlang,因为rabbitmq是使用erlang语言实现, 所以需要安装erlang依赖
     1 set_permissions后的参数如下:
     2 vhost
     3 用于授予用户访问权限的虚拟主机的名称,默认为/ 4 alex
     5 用户的名称,以授予对指定虚拟主机的访问权。
     6 conf
     7 为用户授予配置权限的正则表达式
     8 write
     9 为用户授予写权限的正则表达式
    10 read
    11 一个正则表达式   为用户被授予读权限。
    View Code

           linux中rabbitmq的使用:

    1 #查看队列中的元素
    2  rabbitmqctl list_queues
    3 #查看exchange中的元素
    4 rabbitmqctl list_binding

    3 python rabbitMQ module 安装

    pip install pika
    or
    easy_install pika
    or
    源码
       
    https://pypi.python.org/pypi/pika

         原理

    详细介绍:

     1  RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。
     2     Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。
     3 
     4     Client 123:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。
     5 
     6      对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。
     7 
     8         Exchanges are where producers publish their messages.
     9 
    10         Queuesare where the messages end up and are received by consumers
    11 
    12         Bindings are how the messages get routed from the exchange to particular queues.
    13 
    14    还有几个概念是上述图中没有标明的,那就是Connection(连接),Channel(通道,频道)。
    15    Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
    16 
    17    Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
    18 
    19     那么,为什么使用Channel,而不是直接使用TCP连接?
    20 
    21     对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。有实验表明,1s的数据可以Publish10K的数据包。当然对于不同的硬件环境,不同的数据包大小这个数据肯定不一样,但是我只想说明,对于普通的Consumer或者Producer来说,这已经足够了。如果不够用,你考虑的应该是如何细化split你的设计。
    View Code

    细节阐明:

     1 4.1 使用ack确认Message的正确传递 
     2    默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。当然也可以让同一个Message发送到很多的Consumer。
     3     如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。
     4 
     5      那么什么是正确收到呢?通过ack。每个Message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:
     6 
     7      RabbitMQ Server会把这个信息发送到下一个Consumer。
     8 
     9     如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。
    10 
    11    而且ack的机制可以起到限流的作用(Benefitto throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load。
    12 
    13    当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。
    14 
    15 4.2 Reject a message
    16    有两种方式,第一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中立即删除该Message。
    17 
    18 4.3  Creating a queue
    19       Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。
    20 
    21     那么谁应该负责创建这个queue呢?是Consumer,还是Producer?
    22 
    23 如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。
    24 
    25    queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。
    26 
    27 4.4 Exchanges   
    28     从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
    29      有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
    30 
    31 ·        Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
    32 
    33 ·        Fanout exchange: 会向响应的queue广播。
    34 
    35 ·        Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。
    36 
    37 4.5 Virtual hosts
    38    每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。
    View Code

    更多介绍详见:http://blog.csdn.net/anzhsoft/article/details/19563091

    向rabbitmq发送消息

    使用pika进行通信链接

     在rabbitmq服务器使用命令查看消息队列

     1 import pika
     2  
     3 # 认证的参数(用户名和密码)
     4 credentials = pika.PlainCredentials('ceshi', 'ceshi123')
     5  
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
     8 # 生成句柄
     9 channel = conn.channel()
    10  
    11 # 创建队列
    12 channel.queue_declare(queue='hello118')
    13  
    14 #exchange为分发消息区域
    15 #routing_key为要指定的消息队列
    16 #body为发送的内容
    17 channel.basic_publish(exchange='',
    18 routing_key='hello118',
    19 body='Hello World!')
    20 print(" [x] Sent 'Hello World!'")
    21 conn.close()
    View Code
    1 [root@oldsyang rabbitmq]# rabbitmqctl list_queues
    2 Listing queues
    3 hello118 1
    View Code

    可以看出,hello118队列里现在有一条消息

    从rabbitmq接收消息

     1 import pika
     2 import time
     3  
     4 # 参数认证
     5 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     8 # 创建句柄
     9 channel = conn.channel()
    10  
    11 # 声明一个接收消息的队列(这样如果发送端在发送前如果检测到这个消息队列,就不在单独创建了,而直接使用)
    12 channel.queue_declare(queue="hello118")
    13  
    14  
    15 #拿取消息后的回调函数
    16 def callback(ch, method, properties, body):
    17 time.sleep(15)
    18 print("[info] Received %s" % body)
    19  
    20 #no_ack=True,一旦从队列里拿到消息,这条消息就从队列里消失掉
    21 channel.basic_consume(
    22 callback, queue="hello118", no_ack=True
    23 )
    24 print(' [*] Waiting for messages. To exit press CTRL+C')
    25 channel.start_consuming()
    View Code

    安全接收此时,先启动消息发送者,然后再分别启动3个消息接收者,通过发送者多发送几条消息,你会发现,这几条消息会被依次分配到各个消息接收者身上

    从上边的例子中,接收者在拿到消息后,执行回调函数,在睡眠15秒的时候,如果突然中途退出了,或者挂掉了,那么消息队列里不存在这条消息了,但是这条消息实时上并未被接收者处理。着当然不是我们所希望的那样。

    对于一条消息,接收者拿到,有可能半路就给挂掉(跑路)了,那么这条消息我们认为并未处理完成,仍然应该存在于消息队列里,换句话说我们希望的是当接收者接收到消息,处理完后给一个反馈,然后再把这条消息从队列里清除掉。

    这里我们在回调函数里,加一个消息确认的操作

    1 def callback(ch, method, properties, body):
    2 print("[info] Received %s" % body)
    3 time.sleep(20)
    4 print("done")
    5 #确认消息处理完毕(这样rabbitmq才会从消息队列里清除掉这条消息)
    6 ch.basic_ack(delivery_tag=method.delivery_tag)
    View Code


    并且,在channel.basic_consume方法里不能再设置no_ack=True,因为no_ack=True的意思就是说,拿走消息,消息队列立即清除。 

    1 channel.basic_consume(
    2 callback, queue="xinqueue"
    3 )
    View Code

    完整版接收端代码: 

     1 import pika
     2 import time
     3  
     4 # 参数认证
     5 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     8 # 创建句柄
     9 channel = conn.channel()
    10  
    11 # 声明一个接收消息的队列(这样如果发送端在发送前如果检测到这个消息队列,就不在单独创建了,而直接使用)
    12 channel.queue_declare(queue="ceshi")
    13  
    14  
    15 # 拿取消息后的回调函数
    16 def callback(ch, method, properties, body):
    17 print("[info] Received %s" % body)
    18 #方便模拟终端操作
    19 time.sleep(20)
    20 print("done")
    21 #确认消息处理完毕(这样rabbitmq才会从消息队列里清除掉这条消息)
    22 ch.basic_ack(delivery_tag=method.delivery_tag)
    23  
    24  
    25 channel.basic_consume(
    26 callback, queue="ceshi"
    27 )
    28 print(' [*] Waiting for messages. To exit press CTRL+C')
    29 channel.start_consuming()
    View Code


    验证一下,先通过发送者往队列ceshi里发送一条消息(”wo shi ceshi “),现在先用接收者1接收,接着用接收者2接收。发现: 

    1. 开始20秒计时
    2. 接收者1接收到消息:wo shi ceshi
    3. 接收者2等待接收指令
    4. 在rabbitmq上查看消息队列ceshi里的消息并未被删除
    5. 中断(ctrl+c)接收者1的链接后,接收者2马上接收到消息wo shi ceshi
    6. 20秒结束,在rabbitmq上查看消息队列ceshi里的消息已经被删除

    从上边可以看出,当消息被接收者1接收到后(还未反馈处理结果),rabbitmq将刚刚那条消息给暂时保存起来,别的接收者是不可再接收此消息的,当接收者1中断操作,没有反馈后,rabbitmq又将消息返回到消息队列里,并被接收者2给拿到了

    消息持久化

    那如果在这个过程中rabbitmq宕机了,不但消息会丢失,而且消息队列都根本没有了。

    执行 services rabbitmq-server restart ,再执行rabbitmqctl list_queues,发现消息队列里都没有了

    1
    2
    [root@oldsyang yangzai]# rabbitmqctl list_queues
    Listing queues

    那如何做到消息不丢失呢,这就需要消息持久化操作。在消息持久化之前必须要做到队列持久化,因为队列没了,消息肯定就不可能存在

    要做队列持久化,只需要在声明队列的时候设置durable=True

    1
    channel.queue_declare(queue="xinqueue", durable=True)

    这里在接收者设置durable=True,那在发送端,也必须声明持续化

    验证一下,通过发送端发送一条消息,然后执行 services rabbitmq-server restart,再执行rabbitmqctl list_queues,发现消息队列里
    xinqueue存在。说明 durable=True设置生效。

    但是,现在来看刚刚发送的那条消息是不在了,接下来要做的就是消息持久化,需要在发消息的时候设置另外一个参数properties

     1 # exchange为分发消息区域
     2 # routing_key为要指定的消息队列
     3 # body为发送的内容
     4 channel.basic_publish(exchange='',
     5 routing_key='xinqueue',
     6 body='Hello 222!',
     7 properties=pika.BasicProperties(
     8 delivery_mode=2 #使消息持久化
     9 )
    10 )
    View Code


    验证一下,现在通过发送端发送一条消息,然后执行 services rabbitmq-server restart,再执行rabbitmqctl list_queues,发现消息队列里xinqueue存在。而且队列里的数据也存在,说明设置生效 

    现在发布这一阶段完整版本代码:

    发送端:

     1 import pika
     2  
     3 # 认证的参数(用户名和密码)
     4 credentials = pika.PlainCredentials('ceshi', 'ceshi123')
     5  
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
     8 # 生成句柄
     9 channel = conn.channel()
    10  
    11 # 创建队列
    12 # durable=True意思是持续化这个队列(不会因为宕机就会消失掉)
    13 channel.queue_declare(queue='xinqueue', durable=True)
    14  
    15 # exchange为分发消息区域
    16 # routing_key为要指定的消息队列
    17 # body为发送的内容
    18 channel.basic_publish(exchange='',
    19 routing_key='xinqueue',
    20 body='Hello 222!',
    21 properties=pika.BasicProperties(
    22 delivery_mode=2 # 消息持久化(不会因为宕机而消失)
    23 )
    24 )
    25 print(" [x] Sent 'Hello World!'")
    26 conn.close()
    View Code


    接收端 

     1 import pika
     2 import time
     3  
     4 # 参数认证
     5 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     8 # 创建句柄
     9 channel = conn.channel()
    10  
    11 # 声明一个接收消息的队列(这样如果发送端在发送前如果检测到这个消息队列,就不在单独创建了,而直接使用)
    12 # durable=True意思是持续化这个队列(不会因为宕机就会消失掉)
    13 channel.queue_declare(queue="xinqueue",durable=True)
    14  
    15  
    16 # 拿取消息后的回调函数
    17 def callback(ch, method, properties, body):
    18 print("[info] Received %s" % body)
    19 time.sleep(20)
    20 print("done")
    21 #确认消息处理完毕(这样rabbitmq才会从消息队列里清除掉这条消息)
    22 ch.basic_ack(delivery_tag=method.delivery_tag)
    23  
    24  
    25 channel.basic_consume(
    26 callback, queue="xinqueue"
    27 )
    28 print(' [*] Waiting for messages. To exit press CTRL+C')
    29 channel.start_consuming()
    30  
    View Code


    注意,消息接收端可以声明消息队列,因为如果是接收端先启动,而发送端未启动,那么接收端检测不到队列xinqueue就会报错,如果我们在接收端也声明了这个消息队列xinqueue,发送端在后启动后,检测到已经有xinqueue这个队列了,就不会再单独创建一个xinqueue队列,而是直接使用就可以。持久化操作其实就是将这些信息存放到硬盘上了

    消息公平分发

    如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

    这里主要是在接收端设置channel.basic_qos(prefetch_count=1)

     1 import pika
     2 import time
     3  
     4 # 参数认证
     5 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     8 # 创建句柄
     9 channel = conn.channel()
    10  
    11 # 声明一个接收消息的队列(这样如果发送端在发送前如果检测到这个消息队列,就不在单独创建了,而直接使用)
    12 # durable=True意思是持续化这个队列(不会因为宕机就会消失掉)
    13 channel.queue_declare(queue="xinqueue",durable=True)
    14  
    15  
    16 # 拿取消息后的回调函数
    17 def callback(ch, method, properties, body):
    18 print("[info] Received %s" % body)
    19 time.sleep(20)
    20 print("done")
    21 #确认消息处理完毕(这样rabbitmq才会从消息队列里清除掉这条消息)
    22 ch.basic_ack(delivery_tag=method.delivery_tag)
    23  
    24 #如果我消息没处理完就不要再给我分配消息了
    25 channel.basic_qos(prefetch_count=1)
    26 channel.basic_consume(
    27 callback, queue="xinqueue"
    28 )
    29 print(' [*] Waiting for messages. To exit press CTRL+C')
    30 channel.start_consuming()
    View Code

    之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,消息发布订阅

    Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息,起到一个过滤和分配的作用

    三种模式:

    fanout: 所有bind到此exchange的queue都可以接收消息

    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    • 表达式符号说明:#代表一个或多个字符,代表任何字符 例:#.a会匹配a.a,aa.a,aaa.a等;.a会匹配a.a,b.a,c.a等
    • 使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

    广播(exchange type=fanout)

    发送端代码:

     1 import pika
     2 import sys
     3  
     4 # 认证的参数(用户名和密码)
     5 credentials = pika.PlainCredentials('ceshi', 'ceshi123')
     6  
     7 # 创建链接
     8 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
     9 # 生成句柄
    10 channel = conn.channel()
    11  
    12 # 创建exchange
    13 channel.exchange_declare(exchange='exfanout',
    14 type='fanout')
    15  
    16  
    17 # exchange为分发消息区域
    18 # routing_key为要指定的消息队列,在这里以exchange转发器的形式发消息,这里就不需要写了,
    19 # body为发送的内容
    20 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    21 channel.basic_publish(exchange='exfanout',
    22 routing_key='',
    23 body='Hello 222!',
    24 )
    25 print(" [x] Sent %r" % message)
    26 conn.close()
    View Code


    接收端代码: 

     1 import pika
     2 import time
     3  
     4 # 参数认证
     5 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     6 # 创建链接
     7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     8 # 创建句柄
     9 channel = conn.channel()
    10  
    11 # 声明一个消息转发器
    12 # type=fanout(类型为广播模式)
    13 channel.exchange_declare(exchange='exfanout',
    14 type='fanout')
    15  
    16 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    17 queue_name = result.method.queue
    18  
    19 channel.queue_bind(exchange='exfanout',
    20 queue=queue_name)
    21  
    22  
    23 # 拿取消息后的回调函数
    24 def callback(ch, method, properties, body):
    25 print("[info] Received %s" % body)
    26 # time.sleep(20)
    27 print("done")
    28  
    29  
    30 channel.basic_consume(
    31 callback, queue=queue_name, no_ack=True
    32 )
    33 print(' [*] Waiting for messages. To exit press CTRL+C')
    34 channel.start_consuming()
    View Code


    验证一下:打开两个接收者,接收者1和接收者2,再通过发送端发送消息 

    1. 打开两个接收者后,在rabbitmq上查看到两个随机消息队列,并且都没有消息
    2. 在发送端发送消息后,接收者1和接收者2都收到消息了

    有选择的接收消息(exchange type=direct)

    RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    发送端代码:

     1 import pika
     2 import sys
     3  
     4 # 认证的参数(用户名和密码)
     5 credentials = pika.PlainCredentials('ceshi', 'ceshi123')
     6  
     7 # 创建链接
     8 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
     9 # 生成句柄
    10 channel = conn.channel()
    11  
    12 # 创建exchange
    13 channel.exchange_declare(exchange='exdirect',
    14 type='direct')
    15  
    16  
    17 # exchange为分发消息区域
    18 # routing_key为要指定的消息队列,在这里可以自定义一些关键字,消息将被发往含这些关键字的消息队列
    19 # body为发送的内容
    20 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    21 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    22 channel.basic_publish(exchange='exdirect',
    23 routing_key=severity,
    24 body=message)
    25 print(" [x] Sent %r" % message)
    26 conn.close()
    View Code


    接收端代码: 

     1 import pika
     2 import time
     3 import sys
     4  
     5 # 参数认证
     6 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     7 # 创建链接
     8 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     9 # 创建句柄
    10 channel = conn.channel()
    11  
    12 # 声明一个消息转发器
    13 # type=direct(类型为组播)
    14 channel.exchange_declare(exchange='exdirect',
    15 type='direct')
    16  
    17 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    18 queue_name = result.method.queue
    19  
    20 channel.queue_bind(exchange='exdirect',
    21 queue=queue_name)
    22  
    23 severities = sys.argv[1:]
    24 if not severities:
    25 sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    26 sys.exit(1)
    27  
    28 #逐个进行队列的绑定
    29 for severity in severities:
    30 channel.queue_bind(exchange='exdirect',
    31 queue=queue_name,
    32 routing_key=severity)
    33  
    34 # 拿取消息后的回调函数
    35 def callback(ch, method, properties, body):
    36 print("[info] Received %s" % body)
    37 # time.sleep(20)
    38 print("done")
    39  
    40  
    41 channel.basic_consume(
    42 callback, queue=queue_name, no_ack=True
    43 )
    44 print(' [*] Waiting for messages. To exit press CTRL+C')
    45 channel.start_consuming()
    View Code


    接收者1验证一下:打开两个接收者,

    1
    python consumer_direct.py info

    接收者2

    1
    python consumer_direct.py info

    发送者,当执行python producer_direct.py info ‘hahahah’时,只有接收者1收到消息

    发送者,当执行python producer_direct.py error ‘you are error’时,只有接收者2收到消息

    更细致的消息过滤

    发送者代码:

     1 import pika
     2 import sys
     3  
     4 # 认证的参数(用户名和密码)
     5 credentials = pika.PlainCredentials('ceshi', 'ceshi123')
     6  
     7 # 创建链接
     8 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
     9 # 生成句柄
    10 channel = conn.channel()
    11  
    12 # 创建exchange,指定类型为topic
    13 channel.exchange_declare(exchange='extopic',
    14 type='topic')
    15  
    16 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    17 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    18  
    19 # exchange为分发消息区域
    20 # routing_key为要指定的消息队列,
    21 # body为发送的内容
    22 channel.basic_publish(exchange='extopic',
    23 routing_key=routing_key,
    24 body=message)
    25 print(" [x] Sent %r" % message)
    26 conn.close()
    View Code

    接收者代码:

     1 import pika
     2 import time
     3 import sys
     4  
     5 # 参数认证
     6 credentials = pika.PlainCredentials("ceshi", "ceshi123")
     7 # 创建链接
     8 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
     9 # 创建句柄
    10 channel = conn.channel()
    11  
    12 # 声明一个消息转发器
    13 # type=direct(类型为topic)
    14 channel.exchange_declare(exchange='extopic',
    15 type='topic')
    16  
    17 result = channel.queue_declare(exclusive=True)
    18 queue_name = result.method.queue
    19  
    20 binding_keys = sys.argv[1:]
    21 if not binding_keys:
    22 sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    23 sys.exit(1)
    24  
    25 for binding_key in binding_keys:
    26 channel.queue_bind(exchange='extopic',
    27 queue=queue_name,
    28 routing_key=binding_key)
    29  
    30 # 拿取消息后的回调函数
    31 def callback(ch, method, properties, body):
    32 print("[info] Received %s" % body)
    33 # time.sleep(20)
    34 print("done")
    35  
    36 channel.basic_consume(
    37 callback, queue=queue_name, no_ack=True
    38 )
    39 print(' [*] Waiting for messages. To exit press CTRL+C')
    40 channel.start_consuming()
    View Code

    验证一下:打开两个接收者,

    接收者1

    1
    2
    # 接收所有消息
    python consumer_topic.py #

    接收者2

    1
    2
    # 接收以.error结尾的队列的消息
    python consumer_direct.py #.error

    发送者,当执行python producer_direct.py sds ‘hahahah’时,只有接收者1收到消息

    发送者,当执行python producer_direct.py er.error ‘you are error’时,两个都能接收到消息

  • 相关阅读:
    OSI模型白话
    并发
    初始化与清理
    多线程
    recyclerview Adapter
    recyclerview刷新
    surfaceview
    viewgroup绘制流程
    view配置
    项目遇到的问题
  • 原文地址:https://www.cnblogs.com/liuguniang/p/7309244.html
Copyright © 2011-2022 走看看