zoukankan      html  css  js  c++  java
  • 消息队列RabbitMQ的python接口使用

    消息队列是一种常用的开发中间件,适用于异步、分布式、解耦合等业务场景中,而RabbitMQ是其中一种常用的消息队列,今天来总结一下RabbitMQ在python端的使用方法。

    1.发送接收基础

    python一般使用pika库来操作RabbitMQ,需要先用pip安装。

    #1 建立连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 建立一个连接
    channel = connection.channel()               # 建立此连接下的一个频道
    channel.queue_declare(queue='hello')         # 声明一个队列
    
    #2 发送消息
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
    
    #3 接收消息
    channel.basic_consume(on_message_callback=callback, queue='hello')  # 声明消息处理回调函数
    channel.start_consuming()                                           # 开始消费消息,并进入死循环
    
    def callback(channel, method, properties, body):
        print("Received %r" % (body,))                         # body是消息体,根据具体业务进行解析处理
        time.sleep(5)                                          # 模拟处理消息
        channel.basic_ack(delivery_tag = method.delivery_tag)  # 处理完成后,发送ack进行消息确认,消息在服务端安全删除
    
    #4 关闭连接
    connection.close()
    

    2.完整接口示例

    import time
    import random
    import pika
    from pika.exceptions import ChannelClosed, ConnectionClosed
    
    # rabbitmq 配置信息
    MQ_CONFIG = {
        "hostname": "127.0.0.1",
        "port": 8080,
        "vhost": "/",
        "username": "guest",
        "password": "guest",
        "exchange": "exchange",
        "queue": "queue",
        "routing_key": "key"
    }
    
    # 消息队列基类
    class RabbitMQServer(object):
        def __init__(self):
            self.config = MQ_CONFIG                           # 配置文件加载
            self.host = self.config.get("hostname")           # 主机
            self.port = self.config.get("port")               # 端口
            self.username = self.config.get("username")       # 用户名
            self.password = self.config.get("password")       # 密码
            self.vhost = self.config.get("vhost")             # 虚拟主机,VirtualHost之间相互隔离
            self.exchange = self.config.get("exchange")       # 交换机
            self.queue = self.config.get("queue")             # 队列
            self.routing_key = self.config.get("routing_key") # 交换机和队列的绑定
    
            self.connection = None
            self.channel = None
    
        def reconnect(self, heartbeat=True):
            try:
                # 关闭旧的连接
                if self.connection and not self.connection.is_closed:
                    self.connection.close()
    
                # 构造登录参数
                credentials = pika.PlainCredentials(self.username, self.password)
                parameters = pika.ConnectionParameters(self.host, self.port, self.vhost, credentials)         
                                  
                self.connection = pika.BlockingConnection(parameters)
                self.channel = self.connection.channel()
                # 声明交换机
                self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct", durable=True)  
    
                # 消费者实例
                if isinstance(self, RabbitComsumer):
                    self.channel.queue_declare(queue=self.queue, exclusive=True, durable=True)
                    # 绑定队列
                    self.channel.queue_bind(exchange=self.exchange, queue=self.queue, routing_key=self.routing_key)  
                    # 表明最大阻塞未ack的消息数量
                    self.channel.basic_qos(prefetch_count=1)                                                         
                    self.channel.basic_consume(on_message_callback=self.consumer_callback, queue=self.queue, auto_ack=False)
                # 生产者实例
                elif isinstance(self, RabbitPublisher):
                    self.channel.queue_declare(queue=self.queue, exclusive=False, durable=True)
                
            except Exception as e:
                print(e)
    
    def excecute(body):
        pass
    
    # 消费者
    class RabbitComsumer(RabbitMQServer):
        def __init__(self):
            super(RabbitComsumer, self).__init__()
    
        def consumer_callback(self, channel, method, properties, body):
            result = execute(body)  # 模拟处理消息
            if channel.is_open:
                if result:
                    channel.basic_ack(delivery_tag=method.delivery_tag)
                else:
                    # 处理不成功时,发送no_ack
                    channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True) 
            if not channel.is_open:
                print("Callback 接收频道关闭,无法ack")
    
        def start_consumer(self):
            while True:
                try:
                    self.reconnect()
                    self.channel.start_consuming()
                except ConnectionClosed as e:        # 保证连接断开重连
                    self.reconnect()
                    time.sleep(2)
                except ChannelClosed as e:           # 保证连接断开重连
                    self.reconnect()
                    time.sleep(2)
                except Exception as e:
                    self.reconnect()
                    time.sleep(2)
    
    # 生产者
    class RabbitPublisher(RabbitMQServer):
        def __init__(self):
            super(RabbitPublisher, self).__init__()
    
        def start_publish(self):
            self.reconnect()
            i = 1
            while True:
                message = {"value": i}
                try:
                    self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=message)
                    i += 1
                    time.sleep(2)
                except ConnectionClosed as e:
                    self.reconnect()
                    time.sleep(2)
                except ChannelClosed as e:
                    self.reconnect()
                    time.sleep(2)
                except Exception as e:
                    self.reconnect()
                    time.sleep(2)
    
  • 相关阅读:
    系统实践2-2:查看dockerfile-032092135mysql容器的配置信息
    系统综合实践1
    SDN——实验脚本7-2:hardtimeout.json
    SDN——实验脚本7-1:odlnorth.py
    实验 7:OpenDaylight 实验——Python 中的 REST API 调用
    预习非数值数据的编码方式
    预习原码补码
    C语言ll作业01
    C语言寒假大作战04
    C语言寒假大作战03
  • 原文地址:https://www.cnblogs.com/qxcheng/p/12334880.html
Copyright © 2011-2022 走看看