zoukankan      html  css  js  c++  java
  • 一种基于Python Pika库的RabbitMQ Client简单封装

    代码

    Github地址:https://github.com/HanseyLee/RabbitMQClient

    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    import pika
    import hashlib
    import json
    
    def getMd5(input_str):
        """
        :param str input_str: Unicode-objects must be encoded before hashing
        :rtype: str
        """
        hash_obj = hashlib.md5(input_str.encode("utf-8"))
        return hash_obj.hexdigest()
    
    class RabbitMQClient:
        """RabbitMQClient using pika library
    
        default: exchange type is 'topic', routing key is '#', dead letter exchange is 'DLX' and dead letter queue is 'DLQ'.
        """
        __default_exchange_type = "topic"
        # (hash) can substitute for zero or more words, * (star) can substitute for exactly one word.
        __default_routing_key = "#"
        __default_DeadLetterExchange = "DLX"
        __default_DeadLetterQueue = "DLQ"
    
        def __init__(self, username, password, host, port=5672):
            self.host = str(host)
            self.port = int(port)
            # set heartbeat=0, deactivate heartbeat default
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
    			port=self.port, credentials=pika.PlainCredentials(username,password), heartbeat=0))  
            self.channel = self.connection.channel()
    
        #
        # basic operations
        #
    
        def close_connection(self):
            self.connection.close()
    
        def declare_exchange(self, exchange, exchange_type=__default_exchange_type):
            self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
    
        def delete_exchange(self, exchange):
            self.channel.exchange_delete(exchange=exchange)
    
        def declare_queue(self, queue):
            self.channel.queue_declare(queue=queue, durable=True)
    
        def declare_queue_dlx(self, queue, dlx=__default_DeadLetterQueue):
            self.channel.queue_declare(queue=queue, durable=True, arguments={'x-dead-letter-exchange': dlx})
    
        def declare_queue_ttl(self, queue, ttl_seconds):
            self.channel.queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})
    
        def delete_queue(self, queue):
            self.channel.queue_delete(queue=queue)
    
        def bind_exchange_queue(self, queue, exchange, binding_key=__default_routing_key):
            self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key)
    
        #
        # combined operations
        #
    
        def declare_dlx_dlq(self, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue):
            """
            :param str dlx: dead letter exchange
            :param str dlq: dead letter queue
            """
    
            self.declare_exchange(exchange=dlx, exchange_type='fanout')
            self.declare_queue(queue=dlq)
            self.bind_exchange_queue(exchange=dlx, queue=dlq)
    
        def publish(self, message, exchange, queue, routing_key, message_id=None,         
            close_connection=True):
            """
            publish messages with message_id, disk persistency property
            """
    
            if message_id is None:
                message_id = getMd5(input_str=message)
            self.declare_queue(queue=queue)
            self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
                properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
            if close_connection:
                self.close_connection()
    
        def consume(self, callback, queue, dlx=__default_DeadLetterExchange, dlq=__default_DeadLetterQueue, 
            exclusive=False, consumer_tag=None,**kwargs):
            self.declare_dlx_dlq(dlx=dlx, dlq=dlq)
            self.channel.basic_consume(queue=queue, on_message_callback=callback, exclusive=exclusive,
                consumer_tag=consumer_tag,**kwargs)
            try:
                self.channel.start_consuming()
            except KeyboardInterrupt:
                self.channel.stop_consuming()
                self.close_connection()
    
            
        @staticmethod
        def ack_message(channel, method):
            channel.basic_ack(delivery_tag=method.delivery_tag)
    
        @staticmethod
        def reject_to_dlx(channel, method):
            """
            need the queue from which message is consuming has dead letter exchage property
            """
            channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
    
        @staticmethod
        def transmit(channel, method, properties, message, exchange=__default_DeadLetterExchange, 
            routing_key=__default_routing_key, queue=__default_DeadLetterQueue,handler=None):
            if handler is not None:
                message = handler(message)
            message_id = properties.message_id
            if message_id is None:
                message_id = getMd5(input_str=message)
            channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
                properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
            channel.basic_ack(delivery_tag=method.delivery_tag)
    #
    ### Testing
    #
    def callback(ch, method, properties, body):
        print("consumer_tag %r, consume_func %r, %r" % (method.consumer_tag, method.routing_key, properties.message_id))
        # RabbitMQClient.transmit(channel=ch, method=method, properties=properties, message=str(body, 'utf-8'), handler=handler)
        RabbitMQClient.ack_message(channel=ch, method=method)
    
    def handler(input_str):
        return "hadled"+input_str
    
    if __name__ == "__main__":
        mqc = RabbitMQClient(username='xxx',password='xxx',host='xxx',port=5672)
        msg = json.dumps({'a':'aaa'})
        queue = "DLQ"
        # mqc.publish(message=msg, exchange='', routing_key=queue, queue=queue)
        # mqc.consume(callback=callback, queue=queue, consumer_tag='consumer-1')    
        print("==done==")
    

    参考

    https://stackoverflow.com/questions/18418936/rabbitmq-and-relationship-between-channel-and-connection
    https://www.rabbitmq.com/tutorials/tutorial-five-python.html

    注意

    • Connection

      • a real TCP connection to the message broker,It is designed to be long-lived。
      • 设置connection属性heartbeat=0, deactivate heartbeat,这样连接就不会超时,一直保持。
    • Channel

      • a virtual connection (AMPQ connection) inside a Connection,designed to be transient。
      • 一个Channel下设置一个consumer
        • Channel instances must not be shared between threads. Channels are not generally thread-safe as it would make no sense to share them among threads. If you have another thread that needs to use the broker, a new channel is needed.
    • Exchange, Routing_key, Queue

      • Exchange --- Routing_key ---> Queue
      • topic形式的Exchange几乎可以模拟其他的所有模式。
          • (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.
        • When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one => direct模式
        • When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key => like in fanout exchange
      • Queue 可以设置TTL(Time To Live)属性,设置一定时间后,消息自动移除队列,单位为妙。
        • queue_declare(queue=queue, durable=True, arguments={'x-message-ttl': ttl_seconds})
    • Publisher

      • 发布、消费消息之前都需要声明所需的Exchange Queue。
      • 推荐每个消息都设置其 message_id 属性,方便后续业务追踪、打点等。
    • Consumer

      • 多消费者 Round-Robin(RR)公平调度消费

        • Each Consumer runs in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue(In different channnels), the broker uses round-robin to distribute the messages between them equally.
      • 对应消费者,最好在消费消息前指定死亡信件交换器和死信队列(DLX:dead letter exchange, DLQ:dead letter queue)。在消费过程中,不能够处理或处理出现异常的消息可以转发至DLX和DLQ。在转发前也可以对消息进行特定的处理和包装。(如果声明队列的时候指定了DLX属性,如arguments={'x-dead-letter-exchange': dlx}, 消费者在消费时可以直接reject消息,被拒绝的消息会直接到DLX, 这样的好处是不用自己写转发逻辑,缺点是不够灵活,不能够对消息进行处理和包装。)

    <全文完>

    注:内容同步自同名CSDN博客:https://blog.csdn.net/fzlulee/article/details/98480724

  • 相关阅读:
    相邻相邻问题
    修改sqlserver2008数据库的排序规则 (转)
    备份删除
    SQL Server 2005系统数据库master重建、修复(转)
    SQL SERVER 2008 重建损坏的master (转)
    深入理解JavaScript作用域和作用域链
    js词法分析
    利用 Console 来学习、调试JavaScript
    expression解决IE6下固定定位的兼容
    浏览器事件的思考
  • 原文地址:https://www.cnblogs.com/HanseyLee/p/11310418.html
Copyright © 2011-2022 走看看