zoukankan      html  css  js  c++  java
  • RabbitMQ 实现削峰填谷

    import threading
    import time
    import pika
    
    
    class SingletonClass(object):
        """单例模式用来少创建连接"""
        # 加锁,防止并发较高时,同时创建对象,导致创建多个对象
        _singleton_lock = threading.Lock()
    
        def __init__(self, username='baibing', password='123456', ip='47.111.87.61', port=5672, data={}):
            """__init__在new出来对象后实例化对象"""
            self.credentials = pika.PlainCredentials(username, password)
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host=ip, port=port, credentials=self.credentials))
            self.channel = self.connection.channel()
            print('连接成功')
    
        def __new__(cls):
            """__new__用来创建对象"""
            if not hasattr(SingletonClass, "_instance"):
                with SingletonClass._singleton_lock:
                    if not hasattr(SingletonClass, "_instance"):
                        SingletonClass._instance = super().__new__(cls)
            return SingletonClass._instance
    
        def callback(self, ch, method, properties, body):
            """订阅者的回调函数,可以在这里面做操作,比如释放库存等"""
            print("邮箱", body.decode())
            # 在秒杀活动中,这里来对数据进行平滑的处理
            time.sleep(0.8)
            ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动ack机制,
    
        def connection_close(self):
            """关闭连接"""
            self.connection.close()
    
        def consuming_start(self):
            """等待消息"""
            self.channel.start_consuming()
    
        def this_publisher(self, email, queue_name='HELLOP'):
            """发布者
            email:消息
            queue_name:队列名称
            """
    
            # 1、创建一个名为python-test的交换机 durable=True 代表exchange持久化存储
            self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')
            # self.channel.queue_declare(queue=queue_name)
            # 2、订阅发布模式,向名为python-test的交换机中插入用户邮箱地址email,delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
            self.channel.basic_publish(exchange='python1',
                                       routing_key='#user#',
                                       body=email,
                                       properties=pika.BasicProperties(delivery_mode=2)
                                       )
    
            print("队列{}发送用户邮箱{}到MQ成功".format(queue_name, email))
            # 3. 关闭连接
            self.connection_close()
    
        def this_subscriber(self, queue_name='HELLOP', prefetch_count=10):
            """订阅者
            queue_name:队列名称
            prefetch_count:限制未处理消息的最大值,ack未开启时生效
            """
            # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
            result = self.channel.queue_declare('', durable=True, exclusive=True)
            # 限制未处理消息的最大值 这个值就是你数据库承受的并发量
            self.channel.basic_qos(prefetch_count=5)
    
            # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
            self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')
    
            # 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
            self.channel.queue_bind(exchange='python1', queue=result.method.queue, routing_key='#.anonymous.#')
    
            self.channel.basic_consume(
                result.method.queue,
                self.callback,  # 回调地址(函数)
                auto_ack=False  # 流量削峰 auto_ack必须为false 手动来ack
            )
            # 等待消息
            self.consuming_start()
    
    
    if __name__ == '__main__':
        obj1 = SingletonClass()
        print(id(obj1))
        obj1.this_subscriber()
    
    

    原文链接:https://blog.csdn.net/qq_42874635/article/details/116268306

    先复制过来稍后整理

  • 相关阅读:
    深入理解javascript的this关键字
    很简单的JQuery网页换肤
    有关垂直居中
    层的半透明实现方案
    常用meta整理
    web前端页面性能优化小结
    关于rem布局以及sprit雪碧图的移动端自适应
    mysql入过的坑
    日期格式化函数
    基于iframe父子页面传值的方法。
  • 原文地址:https://www.cnblogs.com/Jacob-yang/p/15124979.html
Copyright © 2011-2022 走看看