zoukankan      html  css  js  c++  java
  • RabbitMQ快速入门python教程

    摘要:HelloWorld 简介 RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。 co...

    HelloWorld

    简介

    RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。

    code

    rabbitmq使用的协议是amqp,用于python的推荐客户端是pika

    1
    pip install pika -i https://pypi.douban.com/simple/

    send.py

    1
    2
    3
    4
    5
    6
    7
    # coding: utf8
    import pika
     
    # 建立一个连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))  # 连接本地的RabbitMQ服务器
    channel = connection.channel()  # 获得channel

    这里链接的是本机的,如果想要连接其他机器上的服务器,只要填入地址或主机名即可。

    接下来我们开始发送消息了,注意要确保接受消息的队列是存在的,否则rabbitmq就丢弃掉该消息

    1
    2
    3
    4
    5
    6
    channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
    channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                      routing_key='hello',  # 发送到该队列 hello 中
                      body='Hello World!')  # 消息内容
     
    connection.close()  # 关闭 同时flush

    RabbitMQ默认需要1GB的空闲磁盘空间,否则发送会失败。

    这时已在本地队列hello中存放了一个消息,如果使用 rabbitmqctl list_queues 可看到

    1
    hello 1

    说明有一个hello队列 里面存放了一个消息

    receive.py

    1
    2
    3
    4
    5
    # coding: utf8
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()

    还是先链接到服务器,和之前发送时相同

    1
    2
    3
    4
    5
    6
    7
    8
    9
    channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错
     
    def callback(ch, method, properties, body):  # 用于接收到消息后的回调
        print(" [x] Received %r" % body)
     
    channel.basic_consume(callback,
                          queue='hello',  # 收指定队列hello的消息
                          no_ack=True)  #在处理完消息后不发送ack给服务器
    channel.start_consuming()  # 启动消息接受 这会进入一个死循环

    工作队列(任务队列)

    工作队列是用于分发耗时任务给多个工作进程的。不立即做那些耗费资源的任务(需要等待这些任务完成),而是安排这些任务之后执行。例如我们把task作为message发送到队列里,启动工作进程来接受并最终执行,且可启动多个工作进程来工作。这适用于web应用,即不应在一个http请求的处理窗口内完成复杂任务。

    1
    2
    3
    4
    5
    6
    channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # 使得消息持久化
                      ))

    分配消息的方式为 轮询 即每个工作进程获得相同的消息数。

    消息ack

    如果消息分配给某个工作进程,但是该工作进程未处理完成就崩溃了,可能该消息就丢失了,因为rabbitmq一旦把一个消息分发给工作进程,它就把该消息删掉了。

    为了预防消息丢失,rabbitmq提供了ack,即工作进程在收到消息并处理后,发送ack给rabbitmq,告知rabbitmq这时候可以把该消息从队列中删除了。如果工作进程挂掉 了,rabbitmq没有收到ack,那么会把该消息 重新分发给其他工作进程。不需要设置timeout,即使该任务需要很长时间也可以处理。

    ack默认是开启的,之前我们的工作进程显示指定了no_ack=True

    1
    channel.basic_consume(callback, queue='hello')  # 会启用ack

    带ack的callback:

    1
    2
    3
    4
    5
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
        time.sleep( body.count('.') )
        print " [x] Done"
        ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

    消息持久化

    但是,有时RabbitMQ重启了,消息也会丢失。可在创建队列时设置持久化:
    (队列的性质一旦确定无法改变)

    1
    channel.queue_declare(queue='task_queue', durable=True)

    同时在发送消息时也得设置该消息的持久化属性:

    channel.basic_publish(exchange='',

    1
    2
    3
    4
    5
    routing_key="task_queue",
    body=message,
    properties=pika.BasicProperties(
       delivery_mode = 2, # make message persistent
    ))

    但是,如果在RabbitMQ刚接收到消息还没来得及存储,消息还是会丢失。同时,RabbitMQ也不是在接受到每个消息都进行存盘操作。如果还需要更完善的保证,需要使用publisher confirm。

    公平的消息分发

    轮询模式的消息分发可能并不公平,例如奇数的消息都是繁重任务的话,某些进程则会一直运行繁 重任务。即使某工作进程上有积压的消息未处理,如很多都没发ack,但是RabbitMQ还是会按照顺序发消息给它。可以在接受进程中加设置:

    1
    channel.basic_qos(prefetch_count=1)

    告知RabbitMQ,这样在一个工作进程没回发ack情况下是不会再分配消息给它。

    群发

    一般情况下,一条消息是发送给一个工作进程,然后完成,有时想把一条消息同时发送给多个进程:

    exchange

    发送者是不是直接发送消息到队列中的,事实上发生者根本不知道消息会发送到那个队列,发送者只能把消息发送到exchange里。exchange一方面收生产者的消息,另一方面把他们推送到队列中。所以作为exchange,它需要知道当收到消息时它需要做什么,是应该把它加到一个特殊的队列中还是放到很多的队列中,或者丢弃。exchange有direct、topic、headers、fanout等种类,而群发使用的即fanout。之前在发布消息时,exchange的值为 '' 即使用default exchange。

    1
    channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中

    临时队列

    1
    2
    3
    result = channel.queue_declare()  # 创建一个随机队列
    result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
    queue_name = result.method.queue

    这样result.method.queue即是队列名称,在发送或接受时即可使用。

    绑定exchange 和 队列

    1
    2
    channel.queue_bind(exchange='logs',
                   queue='hello')

    logs在发送消息时给hello也发一份。

    在发送消息是使用刚刚创建的 logs exchange

    1
    2
    3
    channel.basic_publish(exchange='logs',
                   routing_key='',
                   body=message)

    路由

    之前已经使用过bind,即建立exchange和queue的关系(该队列对来自该exchange的消息有兴趣),bind时可另外指定routing_key选项。

    使用direct exchange

    将对应routing key的消息发送到绑定相同routing key的队列中

    1
    2
    channel.exchange_declare(exchange='direct_logs',
                         type='direct')

    发送函数,发布不同severity的消息:

    1
    2
    3
    channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

    接受函数中绑定对应severity的:

    1
    2
    3
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

    使用topic exchange

    之前使用的direct exchange 只能绑定一个routing key,可以使用这种可以拿.隔开routing key的topic exchange,例如:

    1
    "stock.usd.nyse" "nyse.vmw"

    和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

    1
    2
    * 代表1个单词
    # 代表0个或多个单词

    如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

    1
    2
    3
    4
    5
    6
    Q1:
    *.orange.*  对应的是中间的colour都为orange的
     
    Q2:
    *.*.rabbit  对应的是最后部分的species为rabbit的
    lazy.#      对应的是第一部分是lazy的

    qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

     RPC

    在远程机器上运行一个函数然后获得结果。

    1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    1
    2
    3
    4
    5
    6
    7
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)

    2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

    注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    channel.basic_publish(exchange='',
                           routing_key='rpc_queue',
                           properties=pika.BasicProperties(
                                 reply_to = self.callback_queue,
                                 correlation_id = self.corr_id,
                                 ),
                           body=str(n))  # 发出调用
     
    while self.response is None:  # 这边就相当于阻塞了
        self.connection.process_data_events()  # 查看回调队列
    return int(self.response)

    3、请求会发送到rpc_queue队列
    4、RPC服务器从rpc_queue中取出,执行,发送回复

    1
    2
    3
    4
    5
    6
    7
    8
    9
    channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求
     
    # 处理之后:
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id =
                                                         props.correlation_id),
                     body=str(response))  # 发送回复到回调队列
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

    5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

    1
    2
    if self.corr_id == props.correlation_id:
            self.response = body
  • 相关阅读:
    SpringMVC中静态获取request对象 Spring中获取 HttpServletRequest对象【转载】
    springcloud 的loadbalancer 轮询算法切换方法 2021.4.3
    springboot项目启动增加图标
    rabbitmq 端口作用以及修改方法
    centos8 安装rabbitmq
    springcloud config client Value获取不到信息的问题的处理方法
    springcloud config配置git作为数据源然后启动报错 If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
    Sublime Text的列模式如何操作
    centos8 安装redis
    jQuery简单的Ajax调用
  • 原文地址:https://www.cnblogs.com/saryli/p/9150985.html
Copyright © 2011-2022 走看看