zoukankan      html  css  js  c++  java
  • pika详解(二) BlockingConnection

    pika详解(二) BlockingConnection

    BlockingConnection提供了最通用的连接方式

    提供两个类: BlockingConnection 和 BlockingChannel

    class BlockingConnection(object):
    	def __init__(self, parameters=None, _impl_class=None):
    		...
     

     

    BlockingConnection是在pika异步的基础上提供的阻塞方法, 调用的是 AMQP协议的 Basic.Deliver and Basic.Return

    在使用basic_consume接收消息, 使用basic_publish发送消息的时候仍然可以实现异步

    为防止递归调用或者阻塞, blocking连接/channel 在上下文切换中实现 队列的asynchronously-delivered事件(异步通知事件), 比如在等待BlockingConnection.channel或 BlockingChannel.queue_declare时, 一旦实现嵌套的上下文, 将会同步(synchronously)调用它们, 这涉及到所有的回调函数:

    1.lockingConnection.add_on_connection_blocked_callback,

    2.BlockingConnection.add_on_connection_unblocked_callback, 3.BlockingChannel.basic_consume 等

    避免死锁, 一直夯住: 但rabbitmq资源不足的时候, 当去连接rabbitmq的时候, rabbitmq会告诉客户端Connection.Blocked, 然后rabbitmq会暂停处理连接,直到有资源分配进行处理, 这会影响BlockingConnection和BlockingChannel

    比如用户在basic_publish 使用非发布确认机制下, 遇上rabbitmq暂停处理连接,将会一直阻塞住,用户回调也不会被执行, 可能引起系统宕机, 解决办法是:

    在BlockingConnection初始化时配置blocked_connection_timeout连接参数

    类主要的函数方法及说明:

    class BlockingConnection(object):
        def __init__(self, parameters=None, _impl_class=None):
            """Create a new instance of the Connection object.
            :param None | pika.connection.Parameters | sequence parameters:
                Connection parameters instance or non-empty sequence of them. If
                None, a `pika.connection.Parameters` instance will be created with
                default settings. See `pika.AMQPConnectionWorkflow` for more
                details about multiple parameter configurations and retries.
            :param _impl_class: for tests/debugging only; implementation class;
                None=default
            :raises RuntimeError:
    
            """
    
        def add_on_connection_blocked_callback(self, callback):
            回调以便在连接被阻塞(从RabbitMQ接收到Connection.Blocked)时收到通知,
            在这种状态下,RabbitMQ暂停处理传入数据,直到连接被解除阻塞,
            因此接收此通知的发布者暂停发布直到连接被解除阻塞, 可以调用 
            ConnectionParameters.blocked_connection_timeout 添加超时
    
        def add_on_connection_unblocked_callback(self, callback):
            回调,以便在连接被解除阻塞时收到通知
    
        def call_later(self, delay, callback):
            pass
    
        def add_callback_threadsafe(self, callback):
            """
                connection.add_callback_threadsafe(
                functools.partial(channel.basic_ack, delivery_tag=...))
            """
            回调
    
        def remove_timeout(self, timeout_id):
            移除超时
    
        def close(self, reply_code=200, reply_text='Normal shutdown'):
        	reply_code(int) - 关闭的代码
    		reply_text(str) - 关闭的文本原因
    
    
        def process_data_events(self, time_limit=0):
            pass
    
        def sleep(self, duration):
            延迟
    
        def channel(self, channel_number=None):
            建立channel通道   channel_number 整数 要使用的通道编号,默认为下一个可用通道编号
    
        @property
        def is_closed(self):
            返回bool值
    
        @property
        def is_open(self):
            返回bool值
    
        @property
        def basic_nack_supported(self):
            返回bool值 , 指定服务器是否支持活动连接上的basic.nack
    
        @property
        def consumer_cancel_notify_supported(self):
            返回bool值   服务器是否支持活动连接上的使用者取消通知
    
        @property
        def exchange_exchange_bindings_supported(self):
            返回bool值  活动连接是否支持交换以交换绑定
            
        @property
        def publisher_confirms_supported(self):
            返回bool值  活动连接是否可以使用发布者确认
     

     

    BlockingChannel 通道

    创建示例

    import pika
    
    # Create our connection object
    connection = pika.BlockingConnection()
    
    # The returned object will be a synchronous channel
    channel = connection.channel()
     

     

    参数:

    class BlockingChannel(object):
        def __init__(self, channel_impl, connection):
            pass
     

     

        @property
        def channel_number(self):
            """Channel number"""
            频道号码

     

        @property
        def connection(self):
            """The channel's BlockingConnection instance"""
     
        @property
        def is_closed(self):
            是否关闭, 返回bool
     
        @property
        def is_open(self):
            通道是否开启, 返回bool
     
        def close(self, reply_code=0, reply_text="Normal shutdown"):
            关闭
     
        def flow(self, active):
        关闭和打开通道流量控制。   active(bool) - 打开流程(True)或关闭(False)
     
        def add_on_cancel_callback(self, callback):
        	一个回调函数,该函数将在代理发送Basic.Cancel时调用
      		 callback -:callback(method_frame)其中method_frame类型
       		是pika.frame.Method类型的方法spec.Basic.Cancel
     
        def add_on_return_callback(self, callback):
        		回调函数,该函数将在发布的消息被拒绝并由服务器通过Basic.Return返回时调用
       		 callback(callable) - 使用callback(channel,method,properties,body),
       		 其中channel:pika.Channel方法:
       		 pika.spec.Basic.Return属性:pika.spec.BasicProperties body:bytes
     

     

        def basic_consume(self,queue, on_message_callback, auto_ack=False, 
        								exclusive=False, consumer_tag=None, arguments=None):
        
        		queue(str) - 要使用的队列
    		    on_message_callback(可调用) -用于将消息分派给用户的必需函数,定义:		
    		                    on_message_callback(channel,	method,properties,body)
    
    			channel:BlockingChannel方法:spec.Basic.Deliver属性:
    							spec.BasicProperties body:bytes
    			auto_ack(bool) - 如果设置为True,将使用自动确认模式。
    			exclusive(bool) - 不允许队列中的其他消费者
    			consumer_tag(str) - 您可以指定自己的消费者标签; 如果留空,将自动生成消费者标签
    			arguments(dict) - 消费者的自定义键/值对参数
     
    
        def basic_cancel(self, consumer_tag):
        取消消费者
     

     

        def start_consuming(self):
       			 处理I / O事件并调度计时器和basic_consume 回调,直到取消所有使用者
     

     

        def stop_consuming(self, consumer_tag=None):
       		 取消所有使用者
     
    • 1
    • 2
        def consume(self,queue,auto_ack=False,exclusive=False, arguments=None, 
        						inactivity_timeout=None):
        		阻止队列消耗而不是通过回调。此方法是一个生成器,
        		它将每条消息都生成为方法,属性和正文的元组。当客户通过BlockingChannel.cancel()
        		或代理取消使用者时,活动生成器迭代器终止。
        
    	    
    		参数:	
    			queue(str) - 要使用的队列名称
    			auto_ack(bool) - 告诉代理不要期待ack / nack响应
    			exclusive(bool) - 不允许队列中的其他消费者
    			arguments(dict) - 消费者的自定义键/值对参数
    			inactivity_timeout(float) - 如果给出一个数字(以秒为单位),将导致该方法在给定的
    			不活动时间后产生(None,None,None); 这允许用户在等待消息到达时执行伪常规维护活动。
    			如果给出 None(默认),则该方法将阻塞,直到下一个事件到达
     

     

        def get_waiting_message_count(self):
        返回可以通过BlockingChannel.consume从当前队列使用者生成器检索而不阻塞的消息数
     

     

        def cancel(self):
     
        def basic_ack(self, delivery_tag=0, multiple=False):
        
        	确认一条或多条消息。当客户端发送时,此方法确认通过Deliver或Get-Ok方法传递的一条或多条消息。
        	当由服务器发送时,此方法确认在确认模式下在通道上使用“发布”方法发布的一条或多条消息。
        	确认可以是针对单个消息或一组消息,包括特定消息。
    
    		参数:	
    			delivery-tag(int) - 服务器分配的传递标记
    			multiple(bool) - 如果设置为True,则将传递标记视为“最多并包含”,
    			以便可以使用单个方法确认多个消息。
    			如果设置为False,则传递标记引用单个邮件。如果多个字段为1,并且传递标记为零,
    			则表示确认所有未完成的消息。
     

     

        def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
        		方法允许客户端拒绝一个或多个传入消息。它可用于中断和取消大量传入消息,
        		或将无法处理的消息返回到其原始队列。
    
    	参数:	
    			delivery-tag(int) - 服务器分配的传递标记
    			multiple(bool) - 如果设置为True,则将传递标记视为“最多并包含”,
    			以便可以使用单个方法确认多个消息。
    			如果设置为False,则传递标记引用单个邮件。如果多个字段为1,并且传递标记为零,
    			则表示确认所有未完成的消息。
    			requeue(bool) - 如果requeue为true,服务器将尝试重新排队该消息。
    			如果requeue为false或重新排队尝试失败,则丢弃或删除消息。
     

     

        def basic_get(self, queue, auto_ack=False):
        	从AMQP代理获取单个消息
    
    	参数:	
    		queue(str) - 从中​​获取消息的队列名称
    		auto_ack(bool) - 告诉经纪人不要期待回复
     

     

        def basic_publish(self,exchange, routing_key, body, properties=None, mandatory=False):
       	 参数:	
    			exchange(str) - 要发布的交流
    			routing_key(str) - 要绑定的路由键
    			body(字节) - 消息体; 如果没有身体,空字符串
    			properties(pika.spec.BasicProperties) - 消息属性
    			mandatory(bool) - 强制性标志
     

     

     

        def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
        		指定服务质量
        参数:	
    			prefetch_size(int) - 该字段指定预取窗口大小。如果服务器的大小等于或小于可用的
    			预取大小(并且也属于其他预取限制),则它将提前发送消息。
    			可以设置为零,意味着“没有特定限制”,
    			尽管其他预取限制可能仍然适用。如果在使用者中设置了no-ack选项,则忽略prefetch-size。
    			prefetch_count(int) - 根据整个消息指定预取窗口。该字段可以与预取大小字段结合使用; 
    			如果预取窗口(以及通道和连接级别的窗口)都允许,则只会提前发送消息。
    			如果在使用者中设置了no-ack选项,则忽略prefetch-count。
    			global_qos(bool) - QoS是否适用于频道上的所有消费者
     

     

     

        def basic_recover(self, requeue=False):
        此方法要求服务器重新传送指定通道上的所有未确认消息。可以重新传递零个或多个消息。
        此方法替换异步Recover
     

     

        def basic_reject(self, delivery_tag=None, requeue=True):
        	拒绝传入的消息。此方法允许客户端拒绝邮件。它可用于中断和取消大量传入消息,
        	或将无法处理的消息返回到其原始队列。
    
    	参数:	
    		delivery-tag(int) - 服务器分配的传递标记
    		requeue(bool) - 如果requeue为true,服务器将尝试重新排队该消息。
    		如果requeue为false或重新排队尝试失败,则丢弃或删除消息。
     

     

        def confirm_delivery(self):
        	启用RabbitMQ专有的确认模式
     

     

        def exchange_declare(self,exchange,exchange_type='direct',passive=False,
        	durable=False,auto_delete=False,internal=False,arguments=None):
        
        声明交换机
    	 	exchange(str) - 交换名称由这些字符的非空序列组成:
    	 						字母,数字,连字符,下划线,句点或冒号。
    		exchange_type(str) - 要使用的交换类型
    		passive(bool) - 执行声明或只是检查它是否存在
    		durable(bool) - 重启RabbitMQ
    		auto_delete(bool) - 当不再绑定队列时删除
    		internal(布尔) - 只能由其他交易所发布
    		arguments(dict) - 交换的自定义键/值对参数
     

     

        def exchange_delete(self, exchange=None, if_unused=False):
    	交换机删除
     

     

        def exchange_bind(self, destination, source, routing_key='',arguments=None):
    	交换机绑定
    		
    	destination(str) - 要绑定的目标交换
    	source(str) - 要绑定的源交换
    	routing_key(str) - 要绑定的路由键
    	arguments(dict) - 绑定的自定义键/值对参数
     
    	
        def exchange_unbind(self,destination=None,source=None,routing_key='',
        arguments=None):
        	取消绑定
     

     

        def queue_declare(self,queue, passive=False, durable=False, exclusive=False,
        					auto_delete=False, arguments=None):
        声明队列,
       		 queue(str) - 队列名称; 如果为空字符串,则代理将创建唯一的队列名称
    		passive(bool) - 只检查队列是否存在,如果不存在则引发 ChannelClosed
    		durable(bool) - 经纪人重新开始
    		exclusive(bool) - 仅允许当前连接访问
    		auto_delete(bool) - 消费者取消或断开连接后删除
    		arguments(dict) - 队列的自定义键/值参数
     

     

        def queue_delete(self, queue, if_unused=False, if_empty=False):
        删除队列
     

     

        def queue_purge(self, queue):
        清除指定队列中的所有消息   queue清除的队列的名称
     

     

        def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
    		将队列绑定到指定的交换
    	
    	参数:	
    		queue(str) - 绑定到交换的队列
    		exchange(str) - 要绑定的源交换
    		routing_key(str) - 要绑定的路由键
    		arguments(dict) - 绑定的自定义键/值对参数
     

     

        def queue_unbind(self,queue,exchange=None,routing_key=None,arguments=None):
    		从交换中取消绑定队列
    		queue(str) - 从交换中取消绑定的队列
    		exchange(str) - 要绑定的源交换
    		routing_key(str) - 解除绑定的路由键
    		arguments(dict) - 绑定的自定义键/值对参数
     

     

        def tx_select(self):
    	选择标准交易模式
        def tx_commit(self):
    	事务提交
        def tx_rollback(self):
        事务回滚
     
     
  • 相关阅读:
    在TreeView控件节点中显示图片
    PAT 甲级 1146 Topological Order (25 分)
    PAT 甲级 1146 Topological Order (25 分)
    PAT 甲级 1145 Hashing
    PAT 甲级 1145 Hashing
    PAT 甲级 1144 The Missing Number (20 分)
    PAT 甲级 1144 The Missing Number (20 分)
    PAT 甲级 1151 LCA in a Binary Tree (30 分)
    PAT 甲级 1151 LCA in a Binary Tree (30 分)
    PAT 甲级 1149 Dangerous Goods Packaging
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/13856687.html
Copyright © 2011-2022 走看看