zoukankan      html  css  js  c++  java
  • 从另一个线程获取消息的确认

    pika/pika: Pure Python RabbitMQ/AMQP 0-9-1 client library
    https://github.com/pika/pika#requesting-message-acknowledgements-from-another-thread

    Requesting message acknowledgements from another thread

    The single-threaded usage constraint of an individual Pika connection adapter instance may result in a dropped AMQP/stream connection due to AMQP heartbeat timeout in consumers that take a long time to process an incoming message. A common solution is to delegate processing of the incoming messages to another thread, while the connection adapter's thread continues to service its I/O loop's message pump, permitting AMQP heartbeats and other I/O to be serviced in a timely fashion.

    Messages processed in another thread may not be acknowledged directly from that thread, since all accesses to the connection adapter instance must be from a single thread, which is the thread running the adapter's I/O loop. This is accomplished by requesting a callback to be executed in the adapter's I/O loop thread. For example, the callback function's implementation might look like this:

    def ack_message(channel, delivery_tag):
        """Note that `channel` must be the same Pika channel instance via which
        the message being acknowledged was retrieved (AMQP protocol constraint).
        """
        if channel.is_open:
            channel.basic_ack(delivery_tag)
        else:
            # Channel is already closed, so we can't acknowledge this message;
            # log and/or do something that makes sense for your app in this case.
            pass

    The code running in the other thread may request the ack_message() function to be executed in the connection adapter's I/O loop thread using an adapter-specific mechanism:

    • pika.BlockingConnection abstracts its I/O loop from the application and thus exposes pika.BlockingConnection.add_callback_threadsafe(). Refer to this method's docstring for additional information. For example:

      connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
    • When using a non-blocking connection adapter, such as pika.adapters.asyncio_connection.AsyncioConnection orpika.SelectConnection, you use the underlying asynchronous framework's native API for requesting an I/O loop-bound callback from another thread. For example, pika.SelectConnection's I/O loop provides add_callback_threadsafe(),pika.adapters.tornado_connection.TornadoConnection's I/O loop has add_callback(), whilepika.adapters.asyncio_connection.AsyncioConnection's I/O loop exposes call_soon_threadsafe().

    This threadsafe callback request mechanism may also be used to delegate publishing of messages, etc., from a background thread to the connection adapter's thread.

  • 相关阅读:
    macOS 遇到 svnadmin无法使用的情况
    语音识别进化简史:从造技术到建系统
    你是什么垃圾?人工智能面对干垃圾和湿垃圾“有点蒙”
    垃圾分类的事,让机器人做去吧!
    怎样才能叫一只奶牛自愿挤奶?
    第一次,脑机接口可以实时读取人类语言了
    机器人工作原理的超详细解析,生动、形象!
    1900页数学基础:面向CS的线性代数、拓扑、微积分和最优化
    微软Azure AI负责人:OpenAI只在微软云上训练模型
    Velodyne收购高清地图公司 将研发更安全的ADAS系统
  • 原文地址:https://www.cnblogs.com/rsapaper/p/11011889.html
Copyright © 2011-2022 走看看