zoukankan      html  css  js  c++  java
  • rabbitMQ和pika模块

    rabbitMQ

    rabbit角色

    • management
    • policymaker
    • monitoring
    • administrator

    添加用户并分配角色

    rabbitmqctl add_user name pass

    rabbitmqctl set_user_tags name administrator

    操作可用web控制台进行

    绑定用户到vhost并开放连接权限

    rabbitmqctl set_permissions -p vhost名字 user名字 允许所有的连接

    rabbitmqctl set_permissions -p qpm qpm ".*" ".*" ".*"
    

    操作可用web控制台进行

    pika模块

    python中想要使用rabbitMQ可以利用pika进行操作。

    单个生产者对应单个消费者

    生产者:

    import pika
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.x.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    # 声明queue
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange="",
                          routing_key="hello",
                          body="hello world")
    
    print("send message!")
    
    connection.close()
    

    消费者:

    import pika
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.x.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    # 消费者再次声明queue
    """
    生产者已经声明了queue,为何消费者还要再次声明?
    假如生产者还没建立queue,那消费者这边就无法去queue中等待接收消息,为了防止出现问题,生产者未建立queue的情况下,消费者先建立queue,并去等待消息。
    """
    channel.queue_declare(queue='hello')
    
    
    # 回调函数
    def callback(ch, method, properties, body):
        # ch是通道的实例
        print(ch)
        # 生产者发送消息携带的参数
        print(method)
        print(properties)
        # 消息的内容
        print(body)
        # 如果不加的话,消费者会hang住,监听到有值就调用callback,没值就等待监听消息
        # channel.close()
    
    
    # 从hello的queue中拿数据,然后调用回调函数
    channel.basic_consume(queue="hello",
                          on_message_callback=callback,
                          auto_ack=True)
    
    
    print("waiting for messages")
    
    channel.start_consuming()
    

    单个生产者对应多个消费者

    一个生产者有多个消费者的情况下,rabbitMQ默认的机制是公平轮流分发给消费者,按照上面的代码开启两个消费者和一个消费者,消费者的消息会轮流的分发给两个消费者。

    rabbitMQ消费端的确认和拒绝(安全性)

    假如消费者接收到消息中任务,在执行的过程中挂掉(断开连接)了,那么该任务就相当于丢失(没有获得结果,消息也被取出来了,生产者也以为该消息被消费了),因此为了保证消息的安全性,我们需要做到当消费者没有执行完任务的时候,该消息依然存在,会发送给另一个消费者,或者重连后的消费者。

    # 回调函数
    def callback(ch, method, properties, body):
        print("received message")
        time.sleep(20)
        print(body)
    

    在上面的代码中,为消费者增加一个time.sleep,启动消费者代码,再启动生产者代码,在消费者代码睡眠的时间段,终止消费者代码,然后再重启消费者代码,消费者端已经无法再次获取到该消息了(消息已经被前一个消费者给取出来了,但任务没执行完,就挂掉了,相当于该消息丢失)。

    在收到消息后,消息将立即保留在队列中。为防止其他使用者再次处理消息,Amazon SQS 设置了可见性超时,这是 Amazon SQS 防止其他使用者接收和处理消息的时间段。消息的默认可见性超时为 30 秒。最小值为 0 秒。最大值为 12 小时。

    可见性超时从 Amazon SQS 返回消息时开始。在这段时间里,使用者可以处理和删除消息。但是,如果使用者在删除消息之前失败,并且您的系统没有在可见性超时结束之前对该消息调用 DeleteMessage 操作,则其他使用者将可以看到该消息并且再次接收该消息。如果某条消息只能被接收一次,则您的使用者应在可见性超时期间内删除该消息。

    这个消息被取走,会有一个可见性超时记录,这个消息实际还在队列但是会隐藏,但是你处理完成,返回 DeleteMessage ,证明消息完成了,到了时间设定却没返回 DeleteMessage,这个消息会重新被获取。

    ​ ——aws SQS

    如何实现消息的安全性?

    rabbitMQ中引入了ack机制,消费者在订阅队列的时候可以指定autoAck参数。

    当autoAck等于True时,rabbitMQ会自动把发送出去的消息设置为确认(直接删除),而不管消费者是否真正的消费到了这些数据。

    当autoAck等于False时,rabbitMQ会等待消费者显式地回复确认信号才从移除消息(实际上时先打上删除标记,收到确认,删除标记),因此消费者可以有足够的时间来处理消息,而不用担心消费过程中异常退出而导致消息丢失,因为rabbitMQ会一直持有消息,知道消费者调用basic.ack为止。

    unacked为1,代表着有一个消息没有被消费

    因此,在python中,利用pika模块操作,只需要在消费者添加两个参数:

    • auto_ack=False:关闭自动确认
    • ch.basic_ack(delivery_tag=method.delivery_tag):确认消息
    • ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False):拒绝消息
    # 生产者代码
    
    import pika
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    # 声明queue
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange="",
                          routing_key="hello",
                          body="hello world",
                          # properties=pika.BasicProperties(
                          #     delivery_mode=2,  # 消息的持久化
                          # )
                          )
    
    print("send message!")
    
    connection.close()
    
    # 消费者代码
    
    import pika
    import time
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    # 消费者再次声明queue
    """
    生产者已经声明了queue,为何消费者还要再次声明?
    假如生产者还没建立queue,那消费者这边就无法去queue中等待接收消息,为了防止出现问题,生产者未建立queue的情况下,消费者先建立queue,并去等待消息。
    """
    channel.queue_declare(queue='hello')
    
    
    # 回调函数
    def callback(ch, method, properties, body):
        print(body)
        time.sleep(10)
        #  delivery_tag 是在 channel 中的一个消息计数, 每次消息提取行为都对应一个数字
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print("任务执行完成!")
        """
        消费者在接收到消息之后,还可以拒绝消息,我们只需要调用basic_reject就可以,如下:
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        """
        # 如果不加的话,消费者会hang住,监听到有值就调用callback,没值就等待监听消息
        # channel.close()
    
    
    # 从hello的queue中拿数据,然后调用回调函数
    channel.basic_consume(queue="hello",
                          on_message_callback=callback,
                          auto_ack=False
                          )
    
    
    print("waiting for messages.....")
    
    channel.start_consuming()
    

    rabbitMQ生产端的确认和拒绝(安全性)

    如果在发送方发出消息后,如果exchange写错了,或者没有任何队列绑定我们发送的exchange,那么在这时候发送方对此浑然不知,因此引入了confirm(确认)模式。

    生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbit就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样就使生产者知道消息已经确证到达目的地了。

    如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理,如下图所示:

    通过调用channel.confirm_delivery()就可以将信道设置为comfirm模式。

    import pika
    
    user_pwd = pika.PlainCredentials('admin', 'admin')
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost', credentials=user_pwd))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)
    
    channel.queue_declare(queue='hello', durable=True)
    channel.queue_bind(exchange='testexchange', queue='hello')
    
    channel.confirm_delivery()
    
    for i in range(10):
        channel.basic_publish(exchange='testexchange123', routing_key='hello', body='Hello World!{}'.format(i),
                              mandatory=True,
                              properties=pika.BasicProperties(
                                  delivery_mode=2,  # 消息持久化
    
                              ))
        print(" [x] Sent 'Hello World!{}'".format(i))
    connection.close()
    

    现在,如果我们写错了交换机,在发送的时候就会报错:pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no exchange 'testexchange123' in vhost '/'")。 而mandatory=True参数来确认发出的消息是否有queue接收, 并且所有queue都成功接收,如果没有绑定队列在交换机上的话,在发送的时候就会报错:pika.exceptions.UnroutableError: 1 unroutable message(s) returned

    发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果rabbitmq因为自身内部错误导致消息丢失,就会发送一条nack(basic.nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。

    rabbitMQ的持久化

    在RabbitMQ中,如果遇到RabbitMQ服务停止或者挂掉,那么我们的消息将会出现丢失的情况,为了在RabbitMQ服务重启的情况下,不丢失消息,我们可以将Exchange(交换机)、Queue(队列)与Message(消息)都设置为可持久化的(durable)。这样的话,能够保证绝大部分的消息不会被丢失,但是还有有一些小概率会发生消息丢失的情况。

    简而言之就是,防止一些没有被消费者取走的消息,在rabbitMQ宕机重启之后,这些消息还能恢复。

    队列持久化

    让队列持久化,在声明queue的时候,加上durable=True

    # 声明queue
    channel.queue_declare(queue='hello', durable=True)
    

    需要注意,生产者声明的队列和消费者声明的队列是否持久化要保持一致!否错会报错!

    队列是持久化的队列,不能被修改为不持久化,同理,不持久化的队列同样不能修改为持久化。

    只能删除不持久化的队列,重新创建持久化的队列。

    try:
        # 声明queue
        channel.queue_declare(queue='hello', durable=True)  # 如果队列是不持久化的此处会报错
    except Exception as e:
        channel = connection.channel()
        channel.queue_delete(queue='hello')  # 删除不持久化的队列
        channel.queue_declare(queue='hello', durable=True)  # 重新声明持久化的队列
    

    消息持久化

    让消息持久化,就在发送消息的地方增加持久化属性:delivery_mode=2

    channel.basic_publish(exchange="",
                          routing_key="hello",
                          body="hello world",
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息的持久化
                          )
                          )
    

    需要注意:消息持久化的基础是建立于队列的持久化,如果队列都不持久化,那即使消息被设置成了持久化,rabbitMQ重启之后,队列肯定会没有,那消息也会丢失。

    # 生产者代码
    
    import pika
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    
    channel.queue_declare(queue='hello', durable=True)
    
    channel.basic_publish(exchange="",
                          routing_key="hello",
                          body="hello world",
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息的持久化
                          )
                          )
    
    print("send message!")
    
    connection.close()
    
    # 消费者代码
    
    import pika
    import time
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    # 消费者再次声明queue
    channel.queue_declare(queue='hello', durable=True)
    
    
    # 回调函数
    def callback(ch, method, properties, body):
        print(body)
        print(method.delivery_tag)
        # time.sleep(10)
        #  delivery_tag 是在 channel 中的一个消息计数, 每次消息提取行为都对应一个数字
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print("任务执行完成!")
        """
        消费者在接收到消息之后,还可以拒绝消息,我们只需要调用basic_reject就可以,如下:
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        """
        # 如果不加的话,消费者会hang住,监听到有值就调用callback,没值就等待监听消息
        # channel.close()
    
    
    # 从hello的queue中拿数据,然后调用回调函数
    channel.basic_consume(queue="hello",
                          on_message_callback=callback,
                          auto_ack=False
                          )
    
    
    print("waiting for messages.....")
    
    channel.start_consuming()
    

    rabbitMQ的发布和订阅

    上面的列子中,我们发送消息和接收消息都是一对一,消息是在一个queue中传输的,但有的时候,我们想达到让一个消息被所有的queue收到,类似广播的效果,这时候就要用到exchange了。

    广播在一直播放,当打开录音机的时候可以收到,关掉录音机的时候就收不到,是一种实时的获取方式,且没有所谓的历史记录。

    rabbitMQ的发布和订阅,就类似广播,当消费者没有开始接收消息的时候,rabbitMQ的发布/订阅模式不会保留消息,等待消费者上线接收消息。

    Exchange在定义的时候是有类型的,以决定到底哪些queue符号条件,可以接收消息

    • fanout:所有bind到此exchange的queue都可以接收消息——广播
    • direct:通过routingKey和exchange决定那一个组的queue可以接收消息——组播
    • topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息——根据特征自定义
    表达式符号说明:#代表一个或多个字符,*代表任何字符
    
    例:#.a会匹配a.a, aa.a, aaa.a等
    *.a会匹配a.a, b.a, c.a等
    
    注:使用RoutingKey为#, Exchange Type为topic的时候相当于使用fanout
    
    • headers:通过headers来决定把消息发给哪些queue——通过消息头来定义,一般不用

    广播

    假如我们linux上的日志文件需要发送给所有的人,那么就可以使用广播

    生产者需要声明一个exchange和exchange的类型,并使用exchange发送消息

    # 生产者代码
    
    import pika
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    # 声明一个exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)
    
    # 发送配置为exchange
    channel.basic_publish(exchange="logs",
                          routing_key="",
                          body="exchange message",
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息的持久化
                          )
                          )
    
    print("send message!")
    
    connection.close()
    

    消费者同样需要声明一个exchange,并将queue绑定到exchange。

    # 消费者代码
    
    import pika
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)
    
    # 消费者再次声明queue
    result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print("waiting for logs......")
    
    
    def callback(ch, method, properties, body):
        print(body)
    
    
    channel.basic_consume(queue=queue_name,
                          on_message_callback=callback,
                          auto_ack=True)
    
    channel.start_consuming()
    

    组播

    假如日志不仅要发给所有人,还需要分组,分为错误日志,正常日志,告警日志等组,这里就需要用到分组了。

    生产者需要将exchange的类型改为direct

    # 生产者代码
    
    import pika
    import sys
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  # 严重程度,级别
    
    message = ''.join(sys.argv[2:]) or 'Hello World!'
    
    channel.basic_publish(exchange="direct_logs",
                          routing_key=severity,
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息的持久化
                          )
                          )
    
    print("send message!")
    
    connection.close()
    
    # 消费者代码
    
    import pika
    import sys
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)
    
    # 消费者再次声明queue
    result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    
    if not severities:
        sys.stderr.write('Usage: %s [info] [warning] [error]
    ' % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
    
    print("waiting for logs......")
    
    
    def callback(ch, method, properties, body):
        print(method.routing_key)
        print(body)
    
    
    channel.basic_consume(queue=queue_name,
                          on_message_callback=callback,
                          auto_ack=True)
    
    channel.start_consuming()
    

    自定义组播

    topic,通过在routing_key中自定义匹配符,形成表达式,进行组播。
    启动命令:
    python sender.py mysql.error err happend

    
    # 生产者
    import pika
    import sys
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' # 严重程度,级别
    
    message = ''.join(sys.argv[2:]) or 'Hello World!'
    
    channel.basic_publish(exchange="topic_logs",
      routing_key=routing_key,
      body=message)
    
    print("send message!")
    
    connection.close()
    

    启动命令:
    python receive.py #:会接受所有的生产者发送的信息
    python receive.py mysql.*:会匹配mysql.开头的生产者发送的信息

    # 消费者
    
    import pika
    import sys
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
    
    # 消费者再次声明queue
    result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    
    if not binding_keys:
        sys.stderr.write('Usage: %s [binding_key]
    ' % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
    
    print("waiting for logs......")
    
    def callback(ch, method, properties, body):
        print(method.routing_key)
        print(body)
    
    channel.basic_consume(queue=queue_name,
      on_message_callback=callback,
      auto_ack=True)
    
    channel.start_consuming()
    

    rabbitMQ的消息RPC

    上面的都是单向的,生产者向消费者发送消息,如果生产者和消费者互相通信,就是RPC(remote producre call)远程过程调用。

    客户端循环取值方案:

    # server端代码
    
    
    import pika
    import subprocess
    
    # 用户名和密码
    credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
    
    # 连接到rabbitMQ服务器,建立socket通道(virtual_host: 用户绑定的vhost)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm",
    ))
    
    # 在socket通道之上建立了rabbit协议的通道
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def cmd_exec(cmd):
        try:
            print(cmd)
            res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            if res.returncode == 0:
                try:
                    return res.stdout.decode('utf-8')
                except:
                    return res.stdout.decode('gbk')
            try:
                return res.stderr.decode('utf-8')
            except:
                return res.stderr.decode('gbk')
        except Exception as e:
            return str(e)
    
    def on_request(ch, method, props, body):
        cmd = body.decode('utf-8')
    
        response = cmd_exec(cmd)
    
        ch.basic_publish(exchange='',
      routing_key=props.reply_to,
      properties=pika.BasicProperties(correlation_id=props.correlation_id),
      body=response)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)  # 消费者一次只取一个任务,谁先完成谁继续取,处理不完别来找我
    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
    
    print("等待命令....")
    channel.start_consuming()
    
    
    # 客户端
    
    
    import pika
    import uuid
    
    class FibonacciRpcClient:
        def __init__(self):
            """
      初始化函数的时候就建立管道,接收服务端的任务结果
     """  credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        "123.56.162.92", credentials=credentials, virtual_host="qpm"))
    
            self.channel = self.connection.channel()
    
            # 建立随机管道,用于告诉服务端,任务的结果放在这个随机管道中
      result = self.channel.queue_declare('', exclusive=True)
    
            self.callback_queue = result.method.queue
    
            # 从随机管道中取任务
      self.channel.basic_consume(
                queue=self.callback_queue,
      on_message_callback=self.on_response, # 回调函数
      auto_ack=True,
      )
    
        # 收到任务结果的回调函数
      def on_response(self, ch, method, props, body):
            # 如果客户端的随机字符串和服务端发送过来的随机字符串相等,就代表着该结果属于该任务
      if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, cmd):
            """
      :param cmd:  :return:
     exchange: 交换器
     routing_key: 是管道的名字
     reply_to: 告诉服务端执行完命令把结果丢到哪个管道中
     """  self.response = None
      self.corr_id = str(uuid.uuid4())  # 唯一标识符, 用于标识服务端的结果和客户端命令之间的联系,防止服务端和客户端命令和结果不对等
      self.channel.basic_publish(exchange="",
      routing_key='rpc_queue',
      properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
      correlation_id=self.corr_id,
      ),
      body=str(cmd))
            count = 0
      while self.response is None:
                self.connection.process_data_events(time_limit=3)  # 检查队列中有没有新消息,没加time_limit代表不会阻塞,加了之后会进入阻塞态
      count += 1
      print(f"check {count}")
    
            return self.response  # type: bytes
    
    fibonacci_rpc = FibonacciRpcClient()
    print("等待接收结果.....")
    
    response = fibonacci_rpc.call("dir")
    print("返回的结果是:%r" % response.decode('utf-8'))
    
  • 相关阅读:
    将两个数组对比后合并为同一个数组
    invalid reference format: repository name must be lowercase
    Error: too many open files之ulimt
    vim打开文件末尾带有^M
    双层for循环体里,分别跳出外层循环和内层循环
    echarts 多饼图集合多标题
    近1个月订单占比城市TOP6
    javascript 显示日期
    国密SM2,SM4 For Delphi xe 10.3.3
    Datasnap POST 方案
  • 原文地址:https://www.cnblogs.com/cnhyk/p/13474299.html
Copyright © 2011-2022 走看看