zoukankan      html  css  js  c++  java
  • 消息队列RabbitMQ的python接口使用

    消息队列是一种常用的开发中间件,适用于异步、分布式、解耦合等业务场景中,而RabbitMQ是其中一种常用的消息队列,今天来总结一下RabbitMQ在python端的使用方法。

    1.发送接收基础

    python一般使用pika库来操作RabbitMQ,需要先用pip安装。

    #1 建立连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 建立一个连接
    channel = connection.channel()               # 建立此连接下的一个频道
    channel.queue_declare(queue='hello')         # 声明一个队列
    
    #2 发送消息
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
    
    #3 接收消息
    channel.basic_consume(on_message_callback=callback, queue='hello')  # 声明消息处理回调函数
    channel.start_consuming()                                           # 开始消费消息,并进入死循环
    
    def callback(channel, method, properties, body):
        print("Received %r" % (body,))                         # body是消息体,根据具体业务进行解析处理
        time.sleep(5)                                          # 模拟处理消息
        channel.basic_ack(delivery_tag = method.delivery_tag)  # 处理完成后,发送ack进行消息确认,消息在服务端安全删除
    
    #4 关闭连接
    connection.close()
    

    2.完整接口示例

    import time
    import random
    import pika
    from pika.exceptions import ChannelClosed, ConnectionClosed
    
    # rabbitmq 配置信息
    MQ_CONFIG = {
        "hostname": "127.0.0.1",
        "port": 8080,
        "vhost": "/",
        "username": "guest",
        "password": "guest",
        "exchange": "exchange",
        "queue": "queue",
        "routing_key": "key"
    }
    
    # 消息队列基类
    class RabbitMQServer(object):
        def __init__(self):
            self.config = MQ_CONFIG                           # 配置文件加载
            self.host = self.config.get("hostname")           # 主机
            self.port = self.config.get("port")               # 端口
            self.username = self.config.get("username")       # 用户名
            self.password = self.config.get("password")       # 密码
            self.vhost = self.config.get("vhost")             # 虚拟主机,VirtualHost之间相互隔离
            self.exchange = self.config.get("exchange")       # 交换机
            self.queue = self.config.get("queue")             # 队列
            self.routing_key = self.config.get("routing_key") # 交换机和队列的绑定
    
            self.connection = None
            self.channel = None
    
        def reconnect(self, heartbeat=True):
            try:
                # 关闭旧的连接
                if self.connection and not self.connection.is_closed:
                    self.connection.close()
    
                # 构造登录参数
                credentials = pika.PlainCredentials(self.username, self.password)
                parameters = pika.ConnectionParameters(self.host, self.port, self.vhost, credentials)         
                                  
                self.connection = pika.BlockingConnection(parameters)
                self.channel = self.connection.channel()
                # 声明交换机
                self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct", durable=True)  
    
                # 消费者实例
                if isinstance(self, RabbitComsumer):
                    self.channel.queue_declare(queue=self.queue, exclusive=True, durable=True)
                    # 绑定队列
                    self.channel.queue_bind(exchange=self.exchange, queue=self.queue, routing_key=self.routing_key)  
                    # 表明最大阻塞未ack的消息数量
                    self.channel.basic_qos(prefetch_count=1)                                                         
                    self.channel.basic_consume(on_message_callback=self.consumer_callback, queue=self.queue, auto_ack=False)
                # 生产者实例
                elif isinstance(self, RabbitPublisher):
                    self.channel.queue_declare(queue=self.queue, exclusive=False, durable=True)
                
            except Exception as e:
                print(e)
    
    def excecute(body):
        pass
    
    # 消费者
    class RabbitComsumer(RabbitMQServer):
        def __init__(self):
            super(RabbitComsumer, self).__init__()
    
        def consumer_callback(self, channel, method, properties, body):
            result = execute(body)  # 模拟处理消息
            if channel.is_open:
                if result:
                    channel.basic_ack(delivery_tag=method.delivery_tag)
                else:
                    # 处理不成功时,发送no_ack
                    channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True) 
            if not channel.is_open:
                print("Callback 接收频道关闭,无法ack")
    
        def start_consumer(self):
            while True:
                try:
                    self.reconnect()
                    self.channel.start_consuming()
                except ConnectionClosed as e:        # 保证连接断开重连
                    self.reconnect()
                    time.sleep(2)
                except ChannelClosed as e:           # 保证连接断开重连
                    self.reconnect()
                    time.sleep(2)
                except Exception as e:
                    self.reconnect()
                    time.sleep(2)
    
    # 生产者
    class RabbitPublisher(RabbitMQServer):
        def __init__(self):
            super(RabbitPublisher, self).__init__()
    
        def start_publish(self):
            self.reconnect()
            i = 1
            while True:
                message = {"value": i}
                try:
                    self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=message)
                    i += 1
                    time.sleep(2)
                except ConnectionClosed as e:
                    self.reconnect()
                    time.sleep(2)
                except ChannelClosed as e:
                    self.reconnect()
                    time.sleep(2)
                except Exception as e:
                    self.reconnect()
                    time.sleep(2)
    
  • 相关阅读:
    PHP基础学习笔记(一)
    安装wampserver之后,浏览器中输入localhost页面显示IIS7解决办法
    HTML5常识总结(一)
    AngularJs中的服务
    AngularJs中的directives(指令part1)
    Happy Number——LeetCode
    Binary Tree Zigzag Level Order Traversal——LeetCode
    Construct Binary Tree from Preorder and Inorder Traversal——LeetCode
    Construct Binary Tree from Inorder and Postorder Traversal——LeetCode
    Convert Sorted Array to Binary Search Tree——LeetCode
  • 原文地址:https://www.cnblogs.com/qxcheng/p/12334880.html
Copyright © 2011-2022 走看看