zoukankan      html  css  js  c++  java
  • rabbitmq消费端加入精确控频。

    控制频率之前用的是线程池的数量来控制,很难控制。因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率。

    现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。

    里面的rabbitpy后来加的,然来是使用pika的,就框架本身得实现写法上违反了开闭原则,没设计太好。好在不影响调用方式。

    与celery相比

    在推送任务方面比celery的delay要快,推送的任务小。

    使用更简单,没那么花哨给函数加装饰器来注册函数路由。

    可以满足生产了。

    比之前的 使用redis原生list结构作为消息队列取代celery框架。 更好,主要是rabbitmq有消费确认的概念,redis没有,对随意关停正在运行的程序会造成任务丢失。

    # -*- coding: utf-8 -*-
    from collections import Callable
    import time
    from threading import Lock
    import unittest
    import rabbitpy
    from pika import BasicProperties
    # noinspection PyUnresolvedReferences
    from rabbitpy.message import Properties
    import pika
    from pika.adapters.blocking_connection import BlockingChannel
    from pymongo.errors import PyMongoError
    from app.utils_ydf import LogManager
    from app.utils_ydf.mixins import LoggerMixin
    from app.utils_ydf import decorators
    from app.utils_ydf import BoundedThreadPoolExecutor
    from app import config as app_config
    
    LogManager('pika.heartbeat').get_logger_and_add_handlers(1)
    LogManager('rabbitpy').get_logger_and_add_handlers(2)
    LogManager('rabbitpy.base').get_logger_and_add_handlers(2)
    
    
    class ExceptionForRetry(Exception):
        """为了重试的,抛出错误。只是定义了一个子类,用不用都可以"""
    
    
    class ExceptionForRabbitmqRequeue(Exception):
        """遇到此错误,重新放回队列中"""
    
    
    class RabbitmqClientRabbitPy:
        """
        使用rabbitpy包。
        """
    
        # noinspection PyUnusedLocal
        def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
            rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}'
            self.connection = rabbitpy.Connection(rabbit_url)
    
        def creat_a_channel(self) -> rabbitpy.AMQP:
            return rabbitpy.AMQP(self.connection.channel())  # 使用适配器,使rabbitpy包的公有方法几乎接近pika包的channel的方法。
    
    
    class RabbitmqClientPika:
        """
        使用pika包,多线程不安全的包。
        """
    
        def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
            """
            parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
    
            connection = pika.SelectConnection(parameters=parameters,
                                      on_open_callback=on_open)
            :param username:
            :param password:
            :param host:
            :param port:
            :param virtual_host:
            :param heartbeat:
            """
            credentials = pika.PlainCredentials(username, password)
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host, port, virtual_host, credentials, heartbeat=heartbeat))
    
        def creat_a_channel(self) -> BlockingChannel:
            return self.connection.channel()
    
    
    class RabbitMqFactory:
        def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1):
            """
            :param username:
            :param password:
            :param port:
            :param virtual_host:
            :param heartbeat:
            :param is_use_rabbitpy: 为0使用pika,多线程不安全。为1使用rabbitpy,多线程安全的包。
            """
            if is_use_rabbitpy:
                self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
            else:
                self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat)
    
        def get_rabbit_cleint(self):
            return self.rabbit_client
    
    
    class RabbitmqPublisher(LoggerMixin):
        def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10):
            """
            :param queue_name:
            :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika。
            :param log_level_int:
            """
            self._queue_name = queue_name
            self._is_use_rabbitpy = is_use_rabbitpy
            self.logger.setLevel(log_level_int)
            self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
            self.channel = self.rabbit_client.creat_a_channel()
            self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
            self._lock_for_pika = Lock()
            self._lock_for_count = Lock()
            self._current_time = None
            self.count_per_minute = None
            self._init_count()
            self.logger.info(f'{self.__class__} 被实例化了')
    
        def _init_count(self):
            with self._lock_for_count:
                self._current_time = time.time()
                self.count_per_minute = 0
    
        def publish(self, msg: str):
            if self._is_use_rabbitpy:
                self._publish_rabbitpy(msg)
            else:
                self._publish_pika(msg)
            self.logger.debug(f'向{self._queue_name} 队列,推送消息 {msg}')
            """
            # 屏蔽统计减少加锁,能加快速度。
            with self._lock_for_count:
                self.count_per_minute += 1
            if time.time() - self._current_time > 60:
                self._init_count()
                self.logger.info(f'一分钟内推送了 {self.count_per_minute} 条消息到 {self.rabbit_client.connection} 中')
            """
    
        @decorators.tomorrow_threads(100)
        def _publish_rabbitpy(self, msg: str):
            # noinspection PyTypeChecker
            self.channel.basic_publish(
                exchange='',
                routing_key=self._queue_name,
                body=msg,
                properties={'delivery_mode': 2},
            )
    
        def _publish_pika(self, msg: str):
            with self._lock_for_pika:  # 亲测pika多线程publish会出错。
                self.channel.basic_publish(exchange='',
                                           routing_key=self._queue_name,
                                           body=msg,
                                           properties=BasicProperties(
                                               delivery_mode=2,  # make message persistent
                                           )
                                           )
    
        def clear(self):
            self.channel.queue_purge(self._queue_name)
    
        def get_message_count(self):
            if self._is_use_rabbitpy:
                return self._get_message_count_rabbitpy()
            else:
                return self._get_message_count_pika()
    
        def _get_message_count_pika(self):
            queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
            return queue.method.message_count
    
        def _get_message_count_rabbitpy(self):
            ch = self.rabbit_client.connection.channel()
            q = rabbitpy.amqp_queue.Queue(ch, self._queue_name)
            q.durable = True
            msg_count = q.declare(passive=True)[0]
            ch.close()
            return msg_count
    
    
    class RabbitmqConsumer(LoggerMixin):
        def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, is_use_rabbitpy=1):
            """
            :param queue_name:
            :param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。是为了简单,放弃策略和模板来强制参数。
            :param threads_num:
            :param max_retry_times:
            :param log_level:
            :param is_print_detail_exception:
            :param msg_schedule_time_intercal:消息调度的时间间隔,用于控频
            :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika.
            """
            self._queue_name = queue_name
            self.consuming_function = consuming_function
            self._threads_num = threads_num
            self.threadpool = BoundedThreadPoolExecutor(threads_num)
            self._max_retry_times = max_retry_times
            self.logger.setLevel(log_level)
            self.logger.info(f'{self.__class__} 被实例化')
            self._is_print_detail_exception = is_print_detail_exception
            self._msg_schedule_time_intercal = msg_schedule_time_intercal
            self._is_use_rabbitpy = is_use_rabbitpy
    
        def start_consuming_message(self):
            if self._is_use_rabbitpy:
                self._start_consuming_message_rabbitpy()
            else:
                self._start_consuming_message_pika()
    
        @decorators.tomorrow_threads(100)
        @decorators.keep_circulating(1)  # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
        def _start_consuming_message_rabbitpy(self):
            # noinspection PyArgumentEqualDefault
            channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel()  # type:  rabbitpy.AMQP         #
            channel.queue_declare(queue=self._queue_name, durable=True)
            channel.basic_qos(prefetch_count=self._threads_num)
            for message in channel.basic_consume(self._queue_name):
                body = message.body.decode()
                self.logger.debug(f'从rabbitmq取出的消息是:  {body}')
                time.sleep(self._msg_schedule_time_intercal)
                self.threadpool.submit(self._consuming_function_rabbitpy, message)
    
        def _consuming_function_rabbitpy(self, message: rabbitpy.message.Message, current_retry_times=0):
            if current_retry_times < self._max_retry_times:
                # noinspection PyBroadException
                try:
                    self.consuming_function(message.body.decode())
                    message.ack()
                except Exception as e:
                    if isinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)):
                        return message.nack(requeue=True)
                    self.logger.error(f'函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,
     原因是 {type(e)}  {e}', exc_info=self._is_print_detail_exception)
                    self._consuming_function_rabbitpy(message, current_retry_times + 1)
            else:
                self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败')  # 错得超过指定的次数了,就确认消费了。
                message.ack()
    
        @decorators.tomorrow_threads(100)
        @decorators.keep_circulating(1)  # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
        def _start_consuming_message_pika(self):
            channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel()  # 此处先固定使用pika.
            channel.queue_declare(queue=self._queue_name, durable=True)
            channel.basic_qos(prefetch_count=self._threads_num)
    
            def callback(ch, method, properties, body):
                body = body.decode()
                self.logger.debug(f'从rabbitmq取出的消息是:  {body}')
                time.sleep(self._msg_schedule_time_intercal)
                self.threadpool.submit(self._consuming_function_pika, ch, method, properties, body)
    
            channel.basic_consume(callback,
                                  queue=self._queue_name,
                                  # no_ack=True
                                  )
            channel.start_consuming()
    
        @staticmethod
        def __ack_message_pika(channelx, delivery_tagx):
            """Note that `channel` must be the same pika channel instance via which
            the message being ACKed was retrieved (AMQP protocol constraint).
            """
            if channelx.is_open:
                channelx.basic_ack(delivery_tagx)
            else:
                # Channel is already closed, so we can't ACK this message;
                # log and/or do something that makes sense for your app in this case.
                pass
    
        def _consuming_function_pika(self, ch, method, properties, body, current_retry_times=0):
            if current_retry_times < self._max_retry_times:
                # noinspection PyBroadException
                try:
                    self.consuming_function(body)
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                    # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
                except Exception as e:
                    if isinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)):
                        return ch.basic_nack(delivery_tag=method.delivery_tag)
                    self.logger.error(f'函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,
     原因是 {type(e)}  {e}', exc_info=self._is_print_detail_exception)
                    self._consuming_function_pika(ch, method, properties, body, current_retry_times + 1)
            else:
                self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败')  # 错得超过指定的次数了,就确认消费了。
                ch.basic_ack(delivery_tag=method.delivery_tag)
                # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
    
    
    # noinspection PyMethodMayBeStatic
    class _Test(unittest.TestCase):
        def test_publish(self):
            rabbitmq_publisher = RabbitmqPublisher('queue_test', is_use_rabbitpy=1, log_level_int=10)
            [rabbitmq_publisher.publish(str(msg)) for msg in range(2000)]
    
        def test_consume(self):
            def f(body):
                print('....  ', body)
                time.sleep(10)  # 模拟做某事需要阻塞10秒种,必须用并发。
    
            rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=200, is_use_rabbitpy=1, msg_schedule_time_intercal=0.5)
            rabbitmq_consumer.start_consuming_message()
    
    
    if __name__ == '__main__':
        unittest.main()
  • 相关阅读:
    php-fpm sock文件权限设置
    window netsh interface portproxy 配置转发
    powershell 删除8天前的日志
    Ansible拷贝文件遇到的问题
    Git-Credential-Manager-for-Mac-and-Linux
    MySQL开启远程连接的方法
    mac安装神器brew
    ERROR 1819 (HY000): Your password does not satisfy the current policy requirements
    Linux中如何安装RAR
    Linux常用压缩和解压命令
  • 原文地址:https://www.cnblogs.com/ydf0509/p/10272375.html
Copyright © 2011-2022 走看看