zoukankan      html  css  js  c++  java
  • RabbitMQ异步收发

    pika提供了支持异步发送模式的selectconnection方法支持异步发送接收(通过回调的方式)。

    connectioon建立时回调建立channel, channel建立时一次回调各种declare方法,declare建立时依次回调publish。

    同使用blockconnection方法相比,通过wireshark抓包来看,使用 异步的方式会对发包进行一些优化,会将几个包合并成一个大包,然后做一次ack应答从而提高效率,与之相反使用blockconnection时将会做至少两次ack,head一次content一次等。

    因此再试用异步的方式时会获得一定的优化。

      1.异步发送publish:

    import pika
    import time as timer
    from time import time
    
    connection = None
    
    
    def on_open(conn):
        conn.channel(on_open_callback=on_channel_open)
    
    
    def on_channel_open(channel):
        message = 'message body value'
        for i in range(5):
            channel.basic_publish('',
                                  'hello-01',
                                  message,
                                  pika.BasicProperties(content_type='text/plain',
                                                       delivery_mode=1))
    
        connection.close()
    
    
    credentials = pika.PlainCredentials("wjq", "1234")
    parameters = pika.ConnectionParameters(host='192.168.139.128', credentials=credentials)
    
    connection = pika.SelectConnection(
        parameters=parameters, on_open_callback=on_open)
    try:
        connection.ioloop.start()
        connection.ioloop.poller.open = False
        connection.close()
    except KeyboardInterrupt:
        connection.close()
        connection.ioloop.start()

      2.异步接收consum:

    import pika
    import logging
    import traceback
    import time as timer
    from time import time
    
    mylog = logging.getLogger('pika')
    mylog.setLevel(logging.ERROR)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
    mylog.addHandler(ch)
    
    
    def on_open(connection):
        connection.channel(on_open_callback=on_channel_open)
    
    
    channelg = None
    begin_time = time()
    
    # 回调处理函数
    def on_message(unused_channel, basic_deliver, properties, body): print("body>>>", body.decode("utf-8")) timer.sleep(0.1) unused_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag) def on_channel_open(channel): try: channel.basic_consume('hello-01', on_message, False) # channel.start_consuming() except (Exception,): print(traceback.format_exc(), ">>>") credentials = pika.PlainCredentials("wjq", "1234") parameters = pika.ConnectionParameters(host='192.168.139.128', credentials=credentials) connection = pika.SelectConnection( parameters=parameters, on_open_callback=on_open) try: connection.ioloop.start() except KeyboardInterrupt: connection.close() connection.ioloop.start()

    参考地址:

      https://www.jianshu.com/p/a4671c59351a   -- 建议

      https://www.cnblogs.com/cwp-bg/p/8426188.html

      https://blog.51cto.com/8415580/1351328

  • 相关阅读:
    寒假记录九
    寒假记录八
    寒假记录七
    Powershell 检测USB存储设备
    [轉載] AttributeError: module 'comtypes.gen.UIAutomationClient' has no attribute 'IUIAutomation'
    使用Pyinstaller对Python文件打包
    Python使用uiautomation实现Windows平台自动化
    Python 相對路徑
    Python使用Win32com實現 Excel多個Sheet截圖
    JavaScript 提取网页数据
  • 原文地址:https://www.cnblogs.com/WiseAdministrator/p/12594456.html
Copyright © 2011-2022 走看看