zoukankan      html  css  js  c++  java
  • 【python】-- RabbitMQ 安装、基本示例、轮询机制

    RabbitMQ

    MQ全称为Message Queue, 是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍Rabbitmq的基本原理,以及在python下的各种应用。

    python中的queue概念:

    1. 线程queue:只是用于多个线程之间,进行数据同步交互的。
    2. 进程queue:只是用户父进程与子进程进行交互,或者属于同一父进程下的多个子进程进行交互。

    一、安装:

    1、windows:

    RabbitMQ依赖的语言 erlang:下载

    RabbitMQ软件:下载

    windows下实现远程访问RabbitMQ:查看

    2、Linux:

    Linux:安装步骤

    3、python下使用RabbitMQ:

    RabbitMQ的python使用文档 :查看

    安装python rabbitMQ modul: pip3 install pika

    pika演示文档:查看

    二、基本示例:

    1、队列通信图示意:

     

    2、代码示例:

    2.1、producer(发送消息)

    建立socket->声明管道->声明queue->通过一个exchange 发送内容至queue->关闭连接

    import pika
    
    # 声明一个socket 实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道
    channel = connect.channel()
    # 声明queue名称为test
    channel.queue_declare(queue="test")
    
    #RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
    channel.basic_publish(exchange='',
                          routing_key="test",#queue名称
                          body="hello word") #发送给消费者的消息
    
    print("Sent 'hello world'")
    
    connect.close()
    

    2.2 consumers(接受消息)

    创建socket连接->声明管道->声明queue->创建回调函数callback接受消息->开启不停消费

    import pika
    
    # 声明socket实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道  虽然在之前的produce代码中声明过一次管道,
    # 但是在不知道produce中的管道是否运行之前(如果未运行,consumers中也不声明的话就会报错),
    # 在consumers中也声明一次是一种正确的做法
    channel = connect.channel()
    
    #声明管道
    channel.queue_declare(queue="test")
    
    
    #回调函数
    def callback(h, method, properites, body):
        print("-----", h, method, properites, body)
        print("Received %r" % body)
        #ch.basic_ack(delivery_tag=method.delivery_tag) 当no_ack=Flase时,手动确认收到消息
        
    channel.basic_consume(callback, #回调函数
                          queue="test", #queue名称
                          no_ack=True) # 不确认消息是否被接收
    
    print("Waiting for messages")
    #这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住
    channel.start_consuming()
    

      

    #输出

    Waiting for messages
    ----- <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 60127, 0, 0)->('::1', 5672, 0, 0) 
    params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>
    <Basic.Deliver(['consumer_tag=ctag1.0fee7eb3d9e14d838c5676de6768991b', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=test'])> <BasicProperties>
    b'hello word' Received b'hello word'

    callback中的ch,method,properites分别是:

    • ch:是send端管道的内存对象的地址
    • method:指的send端的是发给谁,发给哪个queue的一些信息
    • properites:send端的属性,这边指的send端发过来给recive端的属性
    • body:是send端发过来的消息

     3、远程访问RabbitMQ

    import pika
    # 全局变量
    RabbitMQ_IP = '192.168.116.1'
    RabbitMQ_PORT = 5672
    RabbitMQ_USER = 'test'
    RabbitMQ_PASSWORD = '123456'
    login_broker = pika.PlainCredentials(RabbitMQ_USER, RabbitMQ_PASSWORD)
    conn_param = pika.ConnectionParameters(RabbitMQ_IP, RabbitMQ_PORT, credentials=login_broker)
    conn = pika.BlockingConnection(conn_param)
    channel = conn.channel()
    

    三、RabbitMQ的轮询机制

    1、消息轮询

    在这种模式下,RabbitMQ会默认把(producer)发的消息依次分发给各个消费者(consumers),跟负载均衡差不多

    1个生产者  ----> 3个消费者

    ①初始化状态:3个消费者都在等待生产者发消息

    ②生产者发第1条消息:只有第1个消费者受到消息,第2个和第3个消费者没有收到消息

    ③生产者发第2条消息:只有第2个消费者受到消息,第1个和第3个没有收到的消息

    ④生产者发第3条消息:只有第3个收到消息,第1个和第2个没有收到消息。

    2、no_ack参数分析( 全称no acknowlargement)

    no_ack 的用途:确保 message 被 consumer “成功”处理了。这里“成功”的意思是,(在设置了 no_ack=false 的情况下)只要 consumer 手动应答了 Basic.Ack ,就算其“成功”处理了。 

    2.1、no_ack = Ture (自动应答)代表服务器不关心produce 发出的消息是否被consumers收到(消费):

          在这种情况下,consumer 会在接收到 Basic.Deliver + Content-Header + Content-Body 之后,立即回复 Ack 。而这个 Ack 是 TCP 协议中的 Ack 。此 Ack 的回复不关心 consumer 是否对接收到的数据进行了处理,当然也不关心处理数据所需要的耗时

    三个消费者,一个生产者(前两个消费者,time.sleep()没有被注释,用来模拟两个宕机的客户端,第三个消费者,注释掉了time.sleep(),用来模拟正常的客户端)

    ①初始化状态:3个消费者都在等待生产者发消息

    ②生产者发第1条消息:前两个宕机个消费者没有受到消息,第3个消费者正常的消费者也没有收到消息

    ③结束第一个和第二个消费者进程(宕机客户端),第三个消费者(没有宕机的客户端)没有收到消息

    2.2、no_ack = Flase (手动应答)代表服务器关心produce 发出的消息是否被consumers收到(消费)

    在这种情况下,要求 consumer 在处理完接收到的 Basic.Deliver + Content-Header + Content-Body 之后才回复 Ack 。而这个 Ack 是 AMQP 协议中的 Basic.Ack 。此 Ack 的回复是和业务处理相关的,所以具体的回复时间应该要取决于业务处理的耗时。

    三个消费者,一个生产者(前两个消费者,time.sleep()没有被注释,用来模拟两个宕机的客户端,第三个消费者,注释掉了time.sleep(),用来模拟正常的客户端)

    ①初始化状态:3个消费者都在等待生产者发消息

    ②生产者发第1条消息:前两个宕机个消费者没有受到消息,第3个消费者正常的消费者也没有收到消息

    ③结束第一个和第二个消费者进程(宕机客户端),第三个消费者(没有宕机的客户端)收到消息

  • 相关阅读:
    poj 1113 Wall 凸包的应用
    NYOJ 78 圈水池 (入门级凸包)
    Monotone Chain Convex Hull(单调链凸包)
    poj Sudoku(数独) DFS
    poj 3009 Curling 2.0(dfs)
    poj 3083 Children of the Candy Corn
    Python join()方法
    通过FISH和下一代测序检测肺腺癌ALK基因融合比较
    华大病原微生物检测
    NGS检测ALK融合大起底--转载
  • 原文地址:https://www.cnblogs.com/Keep-Ambition/p/8038976.html
Copyright © 2011-2022 走看看