zoukankan      html  css  js  c++  java
  • RabbitMaClientPoll

    import pika
    import threading
    import random
    import uuid
    import json
    
    # 框架模块
    from django.conf import settings
    
    """
    Class:  
    Parameters:
        Connectionsize:int类型,Rabbitmqpool池连接的最大数
        Channelsize:int类型,Rabbitmqpool池Channel的最大数
    return:None
    """
    
    
    # 单例保证唯一
    class Rabbitmqpool:
        # 定义类属性
        __instance = None
        __lock = threading.Lock()
    
        def __init__(self, Connectionsize, Channelsize):
            self.maxConnectionsize = Connectionsize
            self.maxChannelsize = Channelsize
            self.nowConnectionsize = 0
            self.nowChannelsize = 0
            self.connectpool = {}
            self.channelpool = {}
            self.certdic = {}
    
        # def __new__(cls, Connectionsize, Channelsize):
        #     if not cls.__instance:
        #         cls.__instance = object.__new__(cls)
        #     return cls.__instance
    
        """
        function:  获取一个空闲Channel或者新建一个Channel
        Parameters:
    
        return:
            channel:channel
            cname:连接名
        """
    
        def get_channel(self):
            try:
                self.__lock.acquire()
                cname = ""
                channel = None
                # 在已存在键中查找空闲Channel
                for connectionname in self.connectpool:
                    if len(self.channelpool[connectionname]) != 0:
                        # print("取出一个Channel -----------------", len(self.channelpool[connectionname]))
                        channel = self.channelpool[connectionname][-1]
                        cname = connectionname
                        self.channelpool[connectionname] = self.channelpool[connectionname][0:-1]
                        # print("取出一个Channel")
                        break
                # 如果没有找到空闲Channel,canme为"",则新建一个Channel
                if cname == "":
                    if self.nowChannelsize < self.maxChannelsize:
                        # 从连接池返回一个连接的名字
                        if len(self.connectpool) != 0:
                            cname = random.choice(list(self.connectpool))
                            # 根据名字拿到此连接,传入连接和Pool池创建Channel
                            CreateChannel(self.connectpool[cname], self)
                            # 得到一个新Channel
                            channel = self.channelpool[cname][-1]
                            self.channelpool[cname] = self.channelpool[cname][0:-1]
                            print("创建一个Channel")
                        # 如果没有连接,则新建连接与channel
                        else:
                            if len(self.certdic) != 0:
                                cert = random.choice(list(self.certdic))
                                cname = str(uuid.uuid4().int)
                                print("创建一个连接")
                                CreateConnection(str(self.certdic[cert]["rabbitmq_host"]),
                                                 str(self.certdic[cert]["rabbitmq_port"]),
                                                 str(self.certdic[cert]["rabbitmq_virtual_host"]),
                                                 str(self.certdic[cert]["rabbitmq_user"]),
                                                 str(self.certdic[cert]["rabbitmq_password"]), self, cname,
                                                 self.certdic[cert]["heartbeat"])
                                CreateChannel(self.connectpool[cname], self)
                                # 得到一个新Channel
                                channel = self.channelpool[cname][-1]
                                self.channelpool[cname] = self.channelpool[cname][0:-1]
                                print("创建一个Channel")
                            else:
                                print("无法创建Channel,无连接凭证,不能创建连接!")
                    else:
                        print("无法创建Channel,超过限制")
    
            finally:
                self.__lock.release()
            return channel, cname
    
        def create_channel(self):
            try:
                self.__lock.acquire()
                if len(self.certdic) != 0:
                    cert = random.choice(list(self.certdic))
                    cname = str(uuid.uuid4().int)
                    print("创建一个连接")
                    CreateConnection(str(self.certdic[cert]["rabbitmq_host"]), str(self.certdic[cert]["rabbitmq_port"]),
                                     str(self.certdic[cert]["rabbitmq_virtual_host"]),
                                     str(self.certdic[cert]["rabbitmq_user"]),
                                     str(self.certdic[cert]["rabbitmq_password"]), self, cname,
                                     self.certdic[cert]["heartbeat"])
                    CreateChannel(self.connectpool[cname], self)
                    # 得到一个新Channel
                    channel = self.channelpool[cname][-1]
                    self.channelpool[cname] = self.channelpool[cname][0:-1]
                    print("创建一个Channel")
                    return channel, cname
                else:
                    print("无法创建Channel,无连接凭证,不能创建连接!")
                return None, ""
            finally:
                self.__lock.release()
    
        def return_channel(self, channel, connectionname):
            try:
                self.__lock.acquire()
                # print('还回去 return_channel')
                self.channelpool[connectionname].append(channel)
                # print('还回去 return_channel------------', len(self.channelpool[connectionname]))
            finally:
                self.__lock.release()
    
        def closepool(self):
            pass
    
        def delconnection(self, connectionname):
            try:
                self.__lock.acquire()
                if connectionname in self.connectpool:
                    print('删除链接connectionname', connectionname)
                    del self.connectpool[connectionname]
    
                    self.nowConnectionsize = self.nowConnectionsize - 1
                    self.nowChannelsize = self.nowChannelsize - len(self.channelpool[connectionname])
                    print('删除connectionname', self.nowChannelsize)
                    del self.channelpool[connectionname]
    
            finally:
                self.__lock.release()
    
        def get_certtemplate(self):
            return {"rabbitmq_host": "", "rabbitmq_port": 5672, "rabbitmq_virtual_host": "", "rabbitmq_user": "",
                    "rabbitmq_password": "", "heartbeat": 6000}
    
        def addcert(self, cert):
            self.certdic[cert["rabbitmq_host"]] = cert
    
    
    # 连接可以自己创建
    class CreateConnection:
        def __init__(self, rabbitmq_host, rabbitmq_port, rabbitmq_virtual_host, rabbitmq_user, rabbitmq_password,
                     Rabbitmqpool, Connectionname=str(uuid.uuid4().int), heartbeat=6000):
            if Rabbitmqpool.nowConnectionsize < Rabbitmqpool.maxConnectionsize:
                if Connectionname not in Rabbitmqpool.connectpool:
                    self.rabbitmq_user = str(rabbitmq_user)
                    self.rabbitmq_password = str(rabbitmq_password)
                    self.rabbitmq_host = rabbitmq_host
                    self.rabbitmq_port = rabbitmq_port
                    self.rabbitmq_virtual_host = rabbitmq_virtual_host
                    self.connectionname = Connectionname
                    self.heartbeat = heartbeat
                    # print(self.rabbitmq_user, self.rabbitmq_password, self.rabbitmq_host, self.rabbitmq_port,
                    #      self.rabbitmq_virtual_host, self.connectionname)
                    credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
                    try:
                        self.connection = pika.BlockingConnection(
                            pika.ConnectionParameters(
                                host=rabbitmq_host,
                                port=rabbitmq_port,
                                virtual_host=rabbitmq_virtual_host,
                                heartbeat=heartbeat,
                                credentials=credentials))
                        Rabbitmqpool.connectpool[Connectionname] = self
                        Rabbitmqpool.nowConnectionsize += 1
                        if self.connectionname not in Rabbitmqpool.channelpool:
                            Rabbitmqpool.channelpool[self.connectionname] = []
                        print("创建连接:", Connectionname)
                    except Exception as e:
                        print("创建连接失败:", e)
                else:
                    print("创建连接失败,此连接名已存在:", Connectionname)
            else:
                print("创建连接失败,连接池已满,无法创建连接池")
    
        def get_connection(self):
            return self.connection
    
    
    class CreateChannel:
        def __init__(self, Connection, Rabbitmqpool):
            # print('创建 CreateChannel')
            Rabbitmqpool.channelpool[Connection.connectionname].append(Connection.get_connection().channel())
            Rabbitmqpool.nowChannelsize += 1
    
    
    class RabbitMaClientPoll:
        rabbitmq_host = settings.RABBIT_HOST
        rabbitmq_port = 5672
        rabbitmq_user = settings.RABBIT_USERNAME
        rabbitmq_password = settings.RABBIT_PASSWORD
        rabbitmq_virtual_host = "/"
        credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
        Pool = Rabbitmqpool(100, 100)
        cert = Pool.get_certtemplate()
        cert['rabbitmq_host'] = rabbitmq_host
        cert['rabbitmq_virtual_host'] = rabbitmq_virtual_host
        cert['rabbitmq_user'] = rabbitmq_user
        cert['rabbitmq_password'] = rabbitmq_password
        cert['rabbitmq_port'] = rabbitmq_port
        cert['heartbeat'] = 6000
        Pool.addcert(cert)
        instance = None
    
        def __init__(self):
            cname_list = []
            for i in range(100):
                c, cname = self.Pool.create_channel()
                cname_list.append((c, cname))
                print('---', len(cname_list))
            for item in cname_list:
                c, cname = item
                self.Pool.return_channel(c, cname)
    
        def __new__(cls, *args, **kwargs):
            if cls.instance:
                return cls.instance
            else:
                return super().__new__(cls)
    
        def producer(self, data):
            """
    
            :param data:
            :return:
            """
            data = json.dumps(data)
    
            try:
                c, cname = self.Pool.get_channel()
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC,
                                body=data, )
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_ES,
                                body=data, )
    
                self.Pool.return_channel(c, cname)
            except Exception as e:
                print("发送错误:", e)  # 链接过期
                self.Pool.delconnection(cname)  # channel过期时,删除此链接和此链接下的所有channel
                c, cname = self.Pool.create_channel()  # 创建一个新的链接和channel
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC,
                                body=data, )
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_ES,
                                body=data, )
                self.Pool.return_channel(c, cname)
    
        def producer_message(self, data):
            """
    
            :param data:
            :return:
            """
            data = json.dumps(data)
    
            try:
                c, cname = self.Pool.get_channel()
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_MESSAGE,
                                body=data, )
    
                self.Pool.return_channel(c, cname)
            except Exception as e:
                print("发送错误:", e)  # 链接过期
                self.Pool.delconnection(cname)  # channel过期时,删除此链接和此链接下的所有channel
                c, cname = self.Pool.create_channel()  # 创建一个新的链接和channel
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_MESSAGE,
                                body=data, )
                self.Pool.return_channel(c, cname)
    
    
    class RabbitMaClientPoll2(object):
        rabbitmq_host = settings.RABBIT_HOST
        rabbitmq_port = 5672
        rabbitmq_user = settings.RABBIT_USERNAME
        rabbitmq_password = settings.RABBIT_PASSWORD
        rabbitmq_virtual_host = "/"
        credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
        Pool = Rabbitmqpool(10, 5)
        cert = Pool.get_certtemplate()
        cert['rabbitmq_host'] = rabbitmq_host
        cert['rabbitmq_virtual_host'] = rabbitmq_virtual_host
        cert['rabbitmq_user'] = rabbitmq_user
        cert['rabbitmq_password'] = rabbitmq_password
        cert['rabbitmq_port'] = rabbitmq_port
        cert['heartbeat'] = 3600
    
        Pool.addcert(cert)
        instance = None
    
        def __init__(self):
            cname_list = []
            for i in range(5):
                print("===", len(cname_list))
                print()
                c, cname = self.Pool.create_channel()
                cname_list.append((c, cname))
                print("===", len(cname_list))
            for item in cname_list:
                c, cname = item
                self.Pool.return_channel(c, cname)
            pass
    
        def __new__(cls, *args, **kwargs):
            if cls.instance:
                return cls.instance
            else:
                return super().__new__(cls)
    
        def producer(self, data):
            """
    
            :param data:
            :return:
            """
            data = json.dumps(data)
    
            try:
                c, cname = self.Pool.get_channel()
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC,
                                body=data, )
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_ES,
                                body=data, )
    
                self.Pool.return_channel(c, cname)
            except Exception as e:
                print("发送错误:", e)  # 链接过期
                self.Pool.delconnection(cname)  # channel过期时,删除此链接和此链接下的所有channel
                c, cname = self.Pool.create_channel()  # 创建一个新的链接和channel
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC,
                                body=data, )
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_ES,
                                body=data, )
                self.Pool.return_channel(c, cname)
    
        def producer_message(self, data):
            """
    
            :param data:
            :return:
            """
            data = json.dumps(data)
    
            try:
                c, cname = self.Pool.get_channel()
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_MESSAGE,
                                body=data, )
    
                self.Pool.return_channel(c, cname)
            except Exception as e:
                print("发送错误:", e)  # 链接过期
                self.Pool.delconnection(cname)  # channel过期时,删除此链接和此链接下的所有channel
                c, cname = self.Pool.create_channel()  # 创建一个新的链接和channel
                c.basic_publish(exchange='',
                                routing_key=settings.QUEUE_TOPIC_MESSAGE,
                                body=data, )
                self.Pool.return_channel(c, cname)
    
    
    RabbitMaClientPoll_obj_2 = RabbitMaClientPoll2()
    RabbitMaClientPoll_obj = RabbitMaClientPoll()
  • 相关阅读:
    微信 token ticket jsapi_ticket access_token 获取 getAccessToken get_jsapi_ticket方法
    PHP 日志 记录 函数 支持 数组 对象 新浪 sae 环境 去掉 空格 换行 格式化 输出 数组转字符串
    原生 原始 PHP连接MySQL 代码 参考mysqli pdo
    PHP 数字金额转换成中文大写金额的函数 数字转中文
    使用PHPMailer发送带附件并支持HTML内容的邮件
    设置输出编码格式 header 重定向 执行时间 set_time_limit 错误 报告 级别 error_reporting
    html5 bootstrap pannel table 协议 公告 声明 文书 模板
    指向指针的指针
    二级指针
    c语言:当指针成为参数后
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/14072044.html
Copyright © 2011-2022 走看看