zoukankan      html  css  js  c++  java
  • python万能消费框架,新增7种中间件(或操作mq的包)和三种并发模式。

    新增的中间件和并发模式见注释。

    消息队列中间件方面celery支持的,都要支持。并发模式,celery支持的都要支持。

    从无限重复相似代码抽取框架,做成万能复用,是生产力的保障。

    使用模板模式使加新中间件时候,在改实现消费框架的代码非常方便,不会影响到原有中间件使用。

    使用策略模式使加入新的并发模式,,在改实现消费框架的代码非常方便,不会影响到原有并发模式。

    所以实现消费框架的代码虽然很长有1000多行,但修改和增加的时候不会出现如履薄冰的害怕情绪。

    使用工厂模式,使得调用框架时候,非常容易切换基于不同消息中间件的使用,只需要改一个数字就改变消费和推送代码使用的中间件。

    使快速测试不同种类的中间件和并发方式变得很容易。 

    7种中间件包括使用pika  rabbitpy  aqpstorm操作rabbitmq、基于redis的list数据结构、基于mongo queue包实现的mongo消息队列 、基于python Queue对象的消息队列(随着python解释器退出而消失)、基于使用persitqueue包实现的sqllite3本地持久化队列。

    3种并发模式为thread、gevent、evenlet模式。(支持基于多进程的分布式模式,由于启动进程必须是在___name__ = main里面,所以需要用户自己写,自己写Process(target=f).start()。如果要使用多进程,一般也是前面三种模式 加 进程模式配合,例如多进程 + gevent ,只有100%纯cpu计算的才适合纯多进程。)

    使用多进程的目的,开32进程,最高可以使双路e5 32核 cpu使用率达到3200%, 将cpu打满 ,充分利用cpu资源。 如果不开多进程,就算程序忙的要命,cpu使用率过不了110%,浪费cpu资源。

       1 # -*- coding: utf-8 -*-
       2 # @Author  : ydf
       3 
       4 """
       5 类celery的worker模式,可用于一切需要分布式并发的地方,最好是io类型的。可以分布式调度起一切函数。
       6 rabbitmq生产者和消费者框架。完全实现了celery worker模式的全部功能,使用更简单。支持自动重试指定次数,
       7 消费确认,指定数量的并发线程,和指定频率控制1秒钟只运行几次, 同时对mongodb类型的异常做了特殊处理
       8 最开始写得是使用pika包,非线程安全,后来加入rabbitpy,rabbitpy包推送会丢失部分数据,推荐pika包使用
       9 单下划线代表保护,双下划线代表私有。只要关注公有方法就可以,其余是类内部自调用方法。
      10 
      11 
      12 
      13 3月15日
      14 1)、新增RedisConsumer 是基于redis中间件的消费框架,不支持随意暂停程序或者断点,会丢失一部分正在运行中的任务,推荐使用rabbitmq的方式。
      15 get_consumer是使用工厂模式来生成基于rabbit和reids的消费者,使用不同中间件的消费框架更灵活一点点,只需要修改一个数字。
      16 
      17 3月20日
      18 2)、增加支持函数参数过滤的功能,可以随时放心多次推送相同的任务到中间件,会先检查该任务是否需要执行,避免浪费cpu和流量,加快处理速度。
      19 基于函数参数值的过滤,需要设置 do_task_filtering 参数为True才生效,默认为False。
      20 3)、新增支持了函数的参数是多个参数,需要设置is_consuming_function_use_multi_params 为True生效,为了兼容老代码默认为False。
      21 区别是消费函数原来需要
      22 def f(body):   # 函数有且只能有一个参数,是字典的多个键值对来表示参数的值。
      23     print(body['a'])
      24     print(body['b'])
      25 
      26 现在可以
      27 def f(a,b):
      28     print(a)
      29     print(b)
      30 
      31 对于推送的部分,都是一样的,都是推送 {"a":1,"b":2}
      32 
      33 开启消费都是   get_consumer('queue_test', consuming_function=f).start_consuming_message()
      34 
      35 6月3日
      36 1) 增加了RedisPublisher类,和增加get_publisher工厂模式
      37 方法同mqpublisher一样,这是为了增强一致性,以后每个业务的推送和消费,
      38 如果不直接使用RedisPublisher  RedisConsumerer RabbitmqPublisher RabbitMQConsumer这些类,而是使用get_publisher和get_consumer来获取发布和消费对象,
      39 支持修改一个全局变量的broker_kind数字来切换所有平台消费和推送的中间件种类。
      40 2)增加指定不运行的时间的配置。例如可以白天不运行,只在晚上运行。
      41 3)增加了函数超时的配置,当函数运行时间超过n秒后,自动杀死函数,抛出异常。
      42 4) 增加每分钟函数运行次数统计,和按照最近一分钟运行函数次数来预估多久可以运行完成当前队列剩余的任务。
      43 5) 增加一个判断函数,阻塞判断连续多少分钟队列里面是空的。判断任务疑似完成。
      44 6)增加一个终止消费者的标志,设置标志后终止循环调度消息。
      45 7) consumer对象增加内置一个属性,表示相同队列名的publisher实例。
      46 
      47 6月29日
      48 1) 增加消息过期时间的配置,消费时候距离发布时候超过一定时间,丢弃任务。
      49 2)增加基于python内置Queue对象的本地队列作为中间件的发布者和消费者,公有方法的用法与redis和mq的完全一致,
      50 方便没有安装mq和redis的环境使用测试除分布式以外的其他主要功能。使用内置queue无法分布式和不支持程序重启任务接续。
      51 好处是可以改一个数字就把代码运行起来在本地测试,不会接受和推送消息到中间件影响别人,别人也影响不了自己,自测很合适。
      52 3)实例化发布者时候,不在初始化方法中链接中间件,延迟到首次真正使用操作中间件的方法。
      53 4)BoundedThreadpoolExecutor替换成了新的CustomThreadpoolExecutor
      54 
      55 
      56 7月2日
      57 加入了gevent并发模式,设置concurrent_mode为2生效。
      58 
      59 7月3日
      60 加入了evenlet并发模式,设置concurrent_mode为3生效。
      61 
      62 7月4日
      63 1)增加使用amqpstorm实现的rabbit操作的中间件,设置broker_kind为4生效,支持消费确认
      64 2)增加mongo-queue实现的mongodb为中间件的队列,设置broker_kind为5生效,支持确认消费
      65 3)增加persistqueue sqllite3实现的本地持久化队列,支持多进程和多次启动不在同一个解释器下的本地分布式。比python内置Queue对象增加了持久化和支持不同启动批次的脚本推送 消费。sqllite不需要安装这个中间件就可以更方便使用。设置broker_kind为6生效,支持确认消费。
      66 
      67 """
      68 # import functools
      69 import abc
      70 # import atexit
      71 import atexit
      72 import copy
      73 from queue import Queue
      74 import threading
      75 import gevent
      76 import eventlet
      77 import traceback
      78 import typing
      79 import json
      80 from collections import Callable, OrderedDict
      81 import time
      82 from functools import wraps
      83 from threading import Lock, Thread
      84 import unittest
      85 
      86 from mongomq import MongoQueue  # pip install mongo-mq==0.0.1
      87 import sqlite3
      88 import persistqueue  # pip install persist-queue==0.4.2
      89 import amqpstorm  # pip install AMQPStorm==2.7.1
      90 from amqpstorm.basic import Basic as AmqpStormBasic
      91 from amqpstorm.queue import Queue as AmqpStormQueue
      92 import rabbitpy
      93 from pika import BasicProperties
      94 # noinspection PyUnresolvedReferences
      95 from pika.exceptions import ChannelClosed
      96 # from rabbitpy.message import Properties
      97 import pika
      98 from pika.adapters.blocking_connection import BlockingChannel
      99 from pymongo.errors import PyMongoError
     100 from app.utils_ydf import (LogManager, LoggerMixin, RedisMixin, RedisBulkWriteHelper, RedisOperation, decorators, time_util, LoggerLevelSetterMixin, nb_print, CustomThreadPoolExecutor, MongoMixin)
     101 # noinspection PyUnresolvedReferences
     102 from app.utils_ydf import BoundedThreadPoolExecutor, block_python_exit
     103 from app.utils_ydf.custom_evenlet_pool_executor import CustomEventletPoolExecutor, check_evenlet_monkey_patch, evenlet_timeout_deco
     104 from app.utils_ydf.custom_gevent_pool_executor import GeventPoolExecutor, check_gevent_monkey_patch, gevent_timeout_deco
     105 from app import config as app_config
     106 
     107 # LogManager('pika').get_logger_and_add_handlers(10)
     108 # LogManager('pika.heartbeat').get_logger_and_add_handlers(10)
     109 # LogManager('rabbitpy').get_logger_and_add_handlers(10)
     110 # LogManager('rabbitpy.base').get_logger_and_add_handlers(10)
     111 from app.utils_ydf.custom_threadpool_executor import check_not_monkey
     112 
     113 
     114 def delete_keys_from_dict(dictx: dict, keys: list):
     115     for dict_key in keys:
     116         dictx.pop(dict_key)
     117 
     118 
     119 def delete_keys_and_return_new_dict(dictx: dict, keys: list):
     120     dict_new = copy.copy(dictx)  # 主要是去掉一级键 publish_time,浅拷贝即可。
     121     for dict_key in keys:
     122         try:
     123             dict_new.pop(dict_key)
     124         except KeyError:
     125             pass
     126     return dict_new
     127 
     128 
     129 class ExceptionForRetry(Exception):
     130     """为了重试的,抛出错误。只是定义了一个子类,用不用都可以"""
     131 
     132 
     133 class ExceptionForRequeue(Exception):
     134     """框架检测到此错误,重新放回队列中"""
     135 
     136 
     137 class ExceptionForRabbitmqRequeue(ExceptionForRequeue):  # 以后去掉这个异常,抛出上面那个异常就可以了。
     138     """遇到此错误,重新放回队列中"""
     139 
     140 
     141 class RabbitmqClientRabbitPy:
     142     """
     143     使用rabbitpy包。
     144     """
     145 
     146     # noinspection PyUnusedLocal
     147     def __init__(self, username, password, host, port, virtual_host, heartbeat=0):
     148         rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}?heartbeat={heartbeat}'
     149         self.connection = rabbitpy.Connection(rabbit_url)
     150 
     151     def creat_a_channel(self) -> rabbitpy.AMQP:
     152         return rabbitpy.AMQP(self.connection.channel())  # 使用适配器,使rabbitpy包的公有方法几乎接近pika包的channel的方法。
     153 
     154 
     155 class RabbitmqClientPika:
     156     """
     157     使用pika包,多线程不安全的包。
     158     """
     159 
     160     def __init__(self, username, password, host, port, virtual_host, heartbeat=0):
     161         """
     162         parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
     163 
     164         connection = pika.SelectConnection(parameters=parameters,
     165                                   on_open_callback=on_open)
     166         :param username:
     167         :param password:
     168         :param host:
     169         :param port:
     170         :param virtual_host:
     171         :param heartbeat:
     172         """
     173         credentials = pika.PlainCredentials(username, password)
     174         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
     175             host, port, virtual_host, credentials, heartbeat=heartbeat))
     176         # self.connection = pika.SelectConnection(pika.ConnectionParameters(
     177         #     host, port, virtual_host, credentials, heartbeat=heartbeat))
     178 
     179     def creat_a_channel(self) -> BlockingChannel:
     180         return self.connection.channel()
     181 
     182 
     183 class RabbitMqFactory:
     184     def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10, is_use_rabbitpy=0):
     185         """
     186         :param username:
     187         :param password:
     188         :param port:
     189         :param virtual_host:
     190         :param heartbeat:
     191         :param is_use_rabbitpy: 为0使用pika,多线程不安全。为1使用rabbitpy,多线程安全的包。
     192         """
     193         if is_use_rabbitpy:
     194             self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
     195         else:
     196             self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat)
     197 
     198     def get_rabbit_cleint(self):
     199         return self.rabbit_client
     200 
     201 
     202 class AbstractPublisher(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ):
     203     has_init_broker = 0
     204 
     205     def __init__(self, queue_name, log_level_int=10, logger_prefix='', is_add_file_handler=True, clear_queue_within_init=False, is_add_publish_time=False, ):
     206         """
     207         :param queue_name:
     208         :param log_level_int:
     209         :param logger_prefix:
     210         :param is_add_file_handler:
     211         :param clear_queue_within_init:
     212         """
     213         self._queue_name = queue_name
     214         if logger_prefix != '':
     215             logger_prefix += '--'
     216         logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}'
     217         self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level_int, log_filename=f'{logger_name}.log' if is_add_file_handler else None)  #
     218         # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
     219         # self.channel = self.rabbit_client.creat_a_channel()
     220         # self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
     221         self._lock_for_pika = Lock()
     222         self._lock_for_count = Lock()
     223         self._current_time = None
     224         self.count_per_minute = None
     225         self._init_count()
     226         self.custom_init()
     227         self.logger.info(f'{self.__class__} 被实例化了')
     228         self.publish_msg_num_total = 0
     229         self._is_add_publish_time = is_add_publish_time
     230         # atexit.register(self.__at_exit)
     231         if clear_queue_within_init:
     232             self.clear()
     233 
     234     def set_is_add_publish_time(self, is_add_publish_time=True):
     235         self._is_add_publish_time = is_add_publish_time
     236         return self
     237 
     238     def _init_count(self):
     239         with self._lock_for_count:
     240             self._current_time = time.time()
     241             self.count_per_minute = 0
     242 
     243     def custom_init(self):
     244         pass
     245 
     246     def publish(self, msg: typing.Union[str, dict]):
     247         if isinstance(msg, str):
     248             msg = json.loads(msg)
     249         if self._is_add_publish_time:
     250             # msg.update({'publish_time': time.time(), 'publish_time_format': time_util.DatetimeConverter().datetime_str})
     251             msg.update({'publish_time': round(time.time(), 4), })
     252         t_start = time.time()
     253         decorators.handle_exception(retry_times=10, is_throw_error=True, time_sleep=0.1)(self.concrete_realization_of_publish)(json.dumps(msg))
     254         self.logger.debug(f'向{self._queue_name} 队列,推送消息 耗时{round(time.time() - t_start, 4)}秒  {msg}')
     255         with self._lock_for_count:
     256             self.count_per_minute += 1
     257             self.publish_msg_num_total += 1
     258         if time.time() - self._current_time > 10:
     259             self.logger.info(f'10秒内推送了 {self.count_per_minute} 条消息,累计推送了 {self.publish_msg_num_total} 条消息到 {self._queue_name} 中')
     260             self._init_count()
     261 
     262     @abc.abstractmethod
     263     def concrete_realization_of_publish(self, msg):
     264         raise NotImplementedError
     265 
     266     @abc.abstractmethod
     267     def clear(self):
     268         raise NotImplementedError
     269 
     270     @abc.abstractmethod
     271     def get_message_count(self):
     272         raise NotImplementedError
     273 
     274     @abc.abstractmethod
     275     def close(self):
     276         raise NotImplementedError
     277 
     278     def __enter__(self):
     279         return self
     280 
     281     def __exit__(self, exc_type, exc_val, exc_tb):
     282         self.close()
     283         self.logger.warning(f'with中自动关闭publisher连接,累计推送了 {self.publish_msg_num_total} 条消息 ')
     284 
     285     def __at_exit(self):
     286         self.logger.warning(f'程序关闭前,累计推送了 {self.publish_msg_num_total} 条消息 到 {self._queue_name} 中')
     287 
     288 
     289 def deco_mq_conn_error(f):
     290     @wraps(f)
     291     def _deco_mq_conn_error(self, *args, **kwargs):
     292         if not self.has_init_broker:
     293             self.logger.warning(f'对象的方法 【{f.__name__}】 首次使用 rabbitmq channel,进行初始化执行 init_broker 方法')
     294             self.init_broker()
     295             self.has_init_broker = 1
     296             return f(self, *args, **kwargs)
     297         # noinspection PyBroadException
     298         try:
     299             return f(self, *args, **kwargs)
     300         except (pika.exceptions.AMQPError, amqpstorm.AMQPError) as e:  # except Exception as e:   # 现在装饰器用到了绝大多出地方,单个异常类型不行。ex
     301             self.logger.error(f'rabbitmq链接出错   ,方法 {f.__name__}  出错 ,{e}')
     302             self.init_broker()
     303             return f(self, *args, **kwargs)
     304 
     305     return _deco_mq_conn_error
     306 
     307 
     308 class RabbitmqPublisher(AbstractPublisher):
     309     """
     310     使用pika实现的。
     311     """
     312 
     313     # noinspection PyAttributeOutsideInit
     314     def init_broker(self):
     315         self.logger.warning(f'使用pika 链接mq')
     316         self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint()
     317         self.channel = self.rabbit_client.creat_a_channel()
     318         self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
     319 
     320     # noinspection PyAttributeOutsideInit
     321     @deco_mq_conn_error
     322     def concrete_realization_of_publish(self, msg):
     323         with self._lock_for_pika:  # 亲测pika多线程publish会出错
     324             self.channel.basic_publish(exchange='',
     325                                        routing_key=self._queue_name,
     326                                        body=msg,
     327                                        properties=BasicProperties(
     328                                            delivery_mode=2,  # make message persistent   2(1是非持久化)
     329                                        )
     330                                        )
     331 
     332     @deco_mq_conn_error
     333     def clear(self):
     334         self.channel.queue_purge(self._queue_name)
     335         self.logger.warning(f'清除 {self._queue_name} 队列中的消息成功')
     336 
     337     @deco_mq_conn_error
     338     def get_message_count(self):
     339         with self._lock_for_pika:
     340             queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
     341             return queue.method.message_count
     342 
     343     # @deco_mq_conn_error
     344     def close(self):
     345         self.channel.close()
     346         self.rabbit_client.connection.close()
     347         self.logger.warning('关闭pika包 链接')
     348 
     349 
     350 class RabbitmqPublisherUsingRabbitpy(AbstractPublisher):
     351     """
     352     使用rabbitpy包实现的。
     353     """
     354 
     355     # noinspection PyAttributeOutsideInit
     356     def init_broker(self):
     357         self.logger.warning(f'使用rabbitpy包 链接mq')
     358         self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint()
     359         self.channel = self.rabbit_client.creat_a_channel()
     360         self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
     361 
     362     # @decorators.tomorrow_threads(10)
     363     @deco_mq_conn_error
     364     def concrete_realization_of_publish(self, msg):
     365         # noinspection PyTypeChecker
     366         self.channel.basic_publish(
     367             exchange='',
     368             routing_key=self._queue_name,
     369             body=msg,
     370             properties={'delivery_mode': 2},
     371         )
     372 
     373     @deco_mq_conn_error
     374     def clear(self):
     375         self.channel.queue_purge(self._queue_name)
     376         self.logger.warning(f'清除 {self._queue_name} 队列中的消息成功')
     377 
     378     @deco_mq_conn_error
     379     def get_message_count(self):
     380         # noinspection PyUnresolvedReferences
     381         ch_raw_rabbity = self.channel.channel
     382         return rabbitpy.amqp_queue.Queue(ch_raw_rabbity, self._queue_name, durable=True)
     383 
     384     # @deco_mq_conn_error
     385     def close(self):
     386         self.channel.close()
     387         self.rabbit_client.connection.close()
     388         self.logger.warning('关闭rabbitpy包 链接mq')
     389 
     390 
     391 class RabbitmqPublisherUsingAmqpStorm(AbstractPublisher):
     392     # 使用amqpstorm包实现的mq操作。
     393     # 实例属性没在init里面写,造成补全很麻烦,写在这里做类属性,方便pycharm补全
     394     connection = amqpstorm.UriConnection
     395     channel = amqpstorm.Channel
     396     channel_wrapper_by_ampqstormbaic = AmqpStormBasic
     397     queue = AmqpStormQueue
     398 
     399     # noinspection PyAttributeOutsideInit
     400     # @decorators.synchronized
     401     def init_broker(self):
     402         # username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10
     403         self.logger.warning(f'使用AmqpStorm包 链接mq')
     404         self.connection = amqpstorm.UriConnection(
     405             f'amqp://{app_config.RABBITMQ_USER}:{app_config.RABBITMQ_PASS}@{app_config.RABBITMQ_HOST}:{app_config.RABBITMQ_PORT}/{app_config.RABBITMQ_VIRTUAL_HOST}?heartbeat={60 * 10}'
     406         )
     407         self.channel = self.connection.channel()  # type:amqpstorm.Channel
     408         self.channel_wrapper_by_ampqstormbaic = AmqpStormBasic(self.channel)
     409         self.queue = AmqpStormQueue(self.channel)
     410         self.queue.declare(queue=self._queue_name, durable=True)
     411 
     412     # @decorators.tomorrow_threads(10)
     413     @deco_mq_conn_error
     414     def concrete_realization_of_publish(self, msg):
     415         self.channel_wrapper_by_ampqstormbaic.publish(exchange='',
     416                                                       routing_key=self._queue_name,
     417                                                       body=msg,
     418                                                       properties={'delivery_mode': 2}, )
     419         # nb_print(msg)
     420 
     421     @deco_mq_conn_error
     422     def clear(self):
     423         self.queue.purge(self._queue_name)
     424         self.logger.warning(f'清除 {self._queue_name} 队列中的消息成功')
     425 
     426     @deco_mq_conn_error
     427     def get_message_count(self):
     428         # noinspection PyUnresolvedReferences
     429         return self.queue.declare(queue=self._queue_name, durable=True)['message_count']
     430 
     431     # @deco_mq_conn_error
     432     def close(self):
     433         self.channel.close()
     434         self.connection.close()
     435         self.logger.warning('关闭rabbitpy包 链接mq')
     436 
     437 
     438 class RedisPublisher(AbstractPublisher, RedisMixin):
     439     """
     440     使用redis作为中间件
     441     """
     442 
     443     def concrete_realization_of_publish(self, msg):
     444         # noinspection PyTypeChecker
     445         self.redis_db7.rpush(self._queue_name, msg)
     446 
     447     def clear(self):
     448         self.redis_db7.delete(self._queue_name)
     449         self.logger.warning(f'清除 {self._queue_name} 队列中的消息成功')
     450 
     451     def get_message_count(self):
     452         # nb_print(self.redis_db7,self._queue_name)
     453         return self.redis_db7.llen(self._queue_name)
     454 
     455     def close(self):
     456         # self.redis_db7.connection_pool.disconnect()
     457         pass
     458 
     459 
     460 class MongoMqPublisher(AbstractPublisher, MongoMixin):
     461     # 使用mongo-queue包实现的基于mongodb的队列。
     462     # noinspection PyAttributeOutsideInit
     463     def custom_init(self):
     464         self.queue = MongoQueue(
     465             self.mongo_16_client.get_database('conqume_queues').get_collection(self._queue_name),
     466             consumer_id=f"consumer-{time_util.DatetimeConverter().datetime_str}",
     467             timeout=600,
     468             max_attempts=3,
     469             ttl=0)
     470 
     471     def concrete_realization_of_publish(self, msg):
     472         # noinspection PyTypeChecker
     473         self.queue.put(json.loads(msg))
     474 
     475     def clear(self):
     476         self.queue.clear()
     477         self.logger.warning(f'清除 mongo队列 {self._queue_name} 中的消息成功')
     478 
     479     def get_message_count(self):
     480         return self.queue.size()
     481 
     482     def close(self):
     483         pass
     484 
     485 
     486 class PersistQueuePublisher(AbstractPublisher):
     487     """
     488     使用persistqueue实现的本地持久化队列。
     489     这个是本地持久化,支持本地多个启动的python脚本共享队列任务。与LocalPythonQueuePublisher相比,不会随着python解释器退出,导致任务丢失。
     490     """
     491 
     492     # noinspection PyAttributeOutsideInit
     493     def custom_init(self):
     494         # noinspection PyShadowingNames
     495         def _my_new_db_connection(self, path, multithreading, timeout):  # 主要是改了sqlite文件后缀,方便pycharm识别和打开。
     496             # noinspection PyUnusedLocal
     497             conn = None
     498             if path == self._MEMORY:
     499                 conn = sqlite3.connect(path,
     500                                        check_same_thread=not multithreading)
     501             else:
     502                 conn = sqlite3.connect('{}/data.sqlite'.format(path),
     503                                        timeout=timeout,
     504                                        check_same_thread=not multithreading)
     505             conn.execute('PRAGMA journal_mode=WAL;')
     506             return conn
     507 
     508         persistqueue.SQLiteAckQueue._new_db_connection = _my_new_db_connection  # 打猴子补丁。
     509         # REMIND 官方测试基于sqlite的本地持久化,比基于纯文件的持久化,使用相同固态硬盘和操作系统情况下,速度快3倍以上,所以这里选用sqlite方式。
     510 
     511         self.queue = persistqueue.SQLiteAckQueue(path='/sqllite_queues', name=self._queue_name, auto_commit=True, serializer=json, multithreading=True)
     512 
     513     def concrete_realization_of_publish(self, msg):
     514         # noinspection PyTypeChecker
     515         self.queue.put(msg)
     516 
     517     # noinspection PyProtectedMember
     518     def clear(self):
     519         sql = f'{"DELETE"}  {"FROM"} ack_queue_{self._queue_name}'
     520         self.logger.info(sql)
     521         self.queue._getter.execute(sql)
     522         self.queue._getter.commit()
     523         self.logger.warning(f'清除 本地持久化队列 {self._queue_name} 中的消息成功')
     524 
     525     def get_message_count(self):
     526         return self.queue.qsize()
     527 
     528     def close(self):
     529         pass
     530 
     531 
     532 local_pyhton_queue_name__local_pyhton_queue_obj_map = dict()  # 使local queue和其他中间件完全一样的使用方式,使用映射保存队列的名字,使消费和发布通过队列名字能找到队列对象。
     533 
     534 
     535 class LocalPythonQueuePublisher(AbstractPublisher):
     536     """
     537     使用redis作为中间件
     538     """
     539 
     540     # noinspection PyAttributeOutsideInit
     541     def custom_init(self):
     542         if self._queue_name not in local_pyhton_queue_name__local_pyhton_queue_obj_map:
     543             local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name] = Queue()
     544         self.queue = local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name]
     545 
     546     def concrete_realization_of_publish(self, msg):
     547         # noinspection PyTypeChecker
     548         self.queue.put(msg)
     549 
     550     def clear(self):
     551         # noinspection PyUnresolvedReferences
     552         self.queue.queue.clear()
     553         self.logger.warning(f'清除 本地队列中的消息成功')
     554 
     555     def get_message_count(self):
     556         return self.queue.qsize()
     557 
     558     def close(self):
     559         pass
     560 
     561 
     562 class RedisFilter(RedisMixin):
     563     def __init__(self, redis_key_name):
     564         self._redis_key_name = redis_key_name
     565 
     566     @staticmethod
     567     def _get_ordered_str(value):
     568         """对json的键值对在redis中进行过滤,需要先把键值对排序,否则过滤会不准确如 {"a":1,"b":2} 和 {"b":2,"a":1}"""
     569         if isinstance(value, str):
     570             value = json.loads(value)
     571         ordered_dict = OrderedDict()
     572         for k in sorted(value):
     573             ordered_dict[k] = value[k]
     574         return json.dumps(ordered_dict)
     575 
     576     def add_a_value(self, value: typing.Union[str, dict]):
     577         self.redis_db7.sadd(self._redis_key_name, self._get_ordered_str(value))
     578 
     579     def check_value_exists(self, value):
     580         return self.redis_db7.sismember(self._redis_key_name, self._get_ordered_str(value))
     581 
     582 
     583 class AbstractConsumer(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ):
     584     time_interval_for_check_do_not_run_time = 60
     585     BROKER_KIND = None
     586 
     587     @property
     588     @decorators.synchronized
     589     def publisher_of_same_queue(self):
     590         if not self._publisher_of_same_queue:
     591             self._publisher_of_same_queue = get_publisher(self._queue_name, broker_kind=self.BROKER_KIND)
     592             if self._msg_expire_senconds:
     593                 self._publisher_of_same_queue.set_is_add_publish_time()
     594         return self._publisher_of_same_queue
     595 
     596     @classmethod
     597     def join_shedual_task_thread(cls):
     598         """
     599 
     600         :return:
     601         """
     602         """
     603         def ff():
     604             RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平台消费', is_consuming_function_use_multi_params=True).start_consuming_message()
     605             RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平台消费', is_consuming_function_use_multi_params=True).start_consuming_message()
     606             AbstractConsumer.join_shedual_task_thread()            # 如果开多进程启动消费者,在linux上需要这样写下这一行。
     607 
     608 
     609         if __name__ == '__main__':
     610             [Process(target=ff).start() for _ in range(4)]
     611 
     612         """
     613         ConcurrentModeDispatcher.join()
     614 
     615     def __init__(self, queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool=None, concurrent_mode=1,
     616                  max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, msg_expire_senconds=0,
     617                  logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True,
     618                  is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False):
     619         """
     620         :param queue_name:
     621         :param consuming_function: 处理消息的函数。
     622         :param function_timeout : 超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。
     623         :param threads_num:
     624         :param specify_threadpool:使用指定的线程池,可以多个消费者共使用一个线程池,不为None时候。threads_num失效
     625         :param concurrent_mode:并发模式,暂时支持 线程 、gevent、eventlet三种模式。  1线程  2 gevent 3 evenlet
     626         :param max_retry_times:
     627         :param log_level:
     628         :param is_print_detail_exception:
     629         :param msg_schedule_time_intercal:消息调度的时间间隔,用于控频
     630         :param logger_prefix: 日志前缀,可使不同的消费者生成不同的日志
     631         :param create_logger_file : 是否创建文件日志
     632         :param do_task_filtering :是否执行基于函数参数的任务过滤
     633         :is_consuming_function_use_multi_params  函数的参数是否是传统的多参数,不为单个body字典表示多个参数。
     634         :param is_do_not_run_by_specify_time_effect :是否使不运行的时间段生效
     635         :param do_not_run_by_specify_time   :不运行的时间段
     636         :param schedule_tasks_on_main_thread :直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。
     637         """
     638         self._queue_name = queue_name
     639         self.queue_name = queue_name  # 可以换成公有的,免得外部访问有警告。
     640         self.consuming_function = consuming_function
     641         self._function_timeout = function_timeout
     642         self._threads_num = threads_num
     643         self._specify_threadpool = specify_threadpool
     644         self._threadpool = None  # 单独加一个检测消息数量和心跳的线程
     645         self._concurrent_mode = concurrent_mode
     646         self._max_retry_times = max_retry_times
     647         self._is_print_detail_exception = is_print_detail_exception
     648         self._msg_schedule_time_intercal = msg_schedule_time_intercal if msg_schedule_time_intercal > 0.001 else 0.001
     649         self._msg_expire_senconds = msg_expire_senconds
     650 
     651         if self._concurrent_mode not in (1, 2, 3):
     652             raise ValueError('设置的并发模式不正确')
     653         self._concurrent_mode_dispatcher = ConcurrentModeDispatcher(self)
     654 
     655         self._logger_prefix = logger_prefix
     656         self._log_level = log_level
     657         if logger_prefix != '':
     658             logger_prefix += '--'
     659         logger_name = f'{logger_prefix}{self.__class__.__name__}--{self._concurrent_mode_dispatcher.concurrent_name}--{queue_name}'
     660         # nb_print(logger_name)
     661         self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level, log_filename=f'{logger_name}.log' if create_logger_file else None)
     662         self.logger.info(f'{self.__class__} 被实例化')
     663 
     664         self._do_task_filtering = do_task_filtering
     665         self._redis_filter_key_name = f'filter:{queue_name}'
     666         self._redis_filter = RedisFilter(self._redis_filter_key_name)
     667 
     668         self._is_consuming_function_use_multi_params = is_consuming_function_use_multi_params
     669         self._lock_for_pika = Lock()
     670 
     671         self._execute_task_times_every_minute = 0  # 每分钟执行了多少次任务。
     672         self._lock_for_count_execute_task_times_every_minute = Lock()
     673         self._current_time_for_execute_task_times_every_minute = time.time()
     674 
     675         self._msg_num_in_broker = 0
     676         self._last_timestamp_when_has_task_in_queue = 0
     677         self._last_timestamp_print_msg_num = 0
     678 
     679         self._is_do_not_run_by_specify_time_effect = is_do_not_run_by_specify_time_effect
     680         self._do_not_run_by_specify_time = do_not_run_by_specify_time  # 可以设置在指定的时间段不运行。
     681         self._schedule_tasks_on_main_thread = schedule_tasks_on_main_thread
     682 
     683         self.stop_flag = False
     684 
     685         self._publisher_of_same_queue = None
     686 
     687     @property
     688     @decorators.synchronized
     689     def threadpool(self):
     690         return self._concurrent_mode_dispatcher.build_pool()
     691 
     692     def keep_circulating(self, time_sleep=0.001, exit_if_function_run_sucsess=False, is_display_detail_exception=True):
     693         """间隔一段时间,一直循环运行某个方法的装饰器
     694         :param time_sleep :循环的间隔时间
     695         :param is_display_detail_exception
     696         :param exit_if_function_run_sucsess :如果成功了就退出循环
     697         """
     698 
     699         def _keep_circulating(func):
     700             # noinspection PyBroadException
     701             @wraps(func)
     702             def __keep_circulating(*args, **kwargs):
     703                 while 1:
     704                     if self.stop_flag:
     705                         break
     706                     try:
     707                         result = func(*args, **kwargs)
     708                         if exit_if_function_run_sucsess:
     709                             return result
     710                     except Exception as e:
     711                         msg = func.__name__ + '   运行出错
     ' + traceback.format_exc(limit=10) if is_display_detail_exception else str(e)
     712                         self.logger.error(msg)
     713                     finally:
     714                         time.sleep(time_sleep)
     715 
     716             return __keep_circulating
     717 
     718         return _keep_circulating
     719 
     720     def start_consuming_message(self):
     721         self.logger.warning(f'开始消费 {self._queue_name} 中的消息')
     722         # self.threadpool.submit(decorators.keep_circulating(20)(self.check_heartbeat_and_message_count))
     723         self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count))
     724         if self._schedule_tasks_on_main_thread:
     725             # decorators.keep_circulating(1)(self._shedual_task)()
     726             self.keep_circulating(1)(self._shedual_task)()
     727         else:
     728             # t = Thread(target=decorators.keep_circulating(1)(self._shedual_task))
     729             self._concurrent_mode_dispatcher.schedulal_task_with_no_block()
     730 
     731     @abc.abstractmethod
     732     def _shedual_task(self):
     733         raise NotImplementedError
     734 
     735     def _run_consuming_function_with_confirm_and_retry(self, kw: dict, current_retry_times=0):
     736         if self._do_task_filtering and self._redis_filter.check_value_exists(kw['body']):  # 对函数的参数进行检查,过滤已经执行过并且成功的任务。
     737             self.logger.info(f'redis的 [{self._redis_filter_key_name}] 键 中 过滤任务 {kw["body"]}')
     738             self._confirm_consume(kw)
     739             return
     740         with self._lock_for_count_execute_task_times_every_minute:
     741             self._execute_task_times_every_minute += 1
     742             if time.time() - self._current_time_for_execute_task_times_every_minute > 60:
     743                 self.logger.info(
     744                     f'一分钟内执行了 {self._execute_task_times_every_minute} 次函数 [ {self.consuming_function.__name__} ] ,预计'
     745                     f'还需要 {time_util.seconds_to_hour_minute_second(self._msg_num_in_broker / self._execute_task_times_every_minute * 60)} 时间'
     746                     f'才能执行完成 {self._msg_num_in_broker}个剩余的任务 ')
     747                 self._current_time_for_execute_task_times_every_minute = time.time()
     748                 self._execute_task_times_every_minute = 0
     749 
     750         if current_retry_times < self._max_retry_times + 1:
     751             # noinspection PyBroadException
     752             t_start = time.time()
     753             try:
     754                 function_run = self.consuming_function if self._function_timeout == 0 else self._concurrent_mode_dispatcher.timeout_deco(self._function_timeout)(self.consuming_function)
     755                 if self._is_consuming_function_use_multi_params:  # 消费函数使用传统的多参数形式
     756                     function_run(**delete_keys_and_return_new_dict(kw['body'], ['publish_time', 'publish_time_format']))
     757                 else:
     758                     function_run(delete_keys_and_return_new_dict(kw['body'], ['publish_time', 'publish_time_format']))  # 消费函数使用单个参数,参数自身是一个字典,由键值对表示各个参数。
     759                 self._confirm_consume(kw)
     760                 if self._do_task_filtering:
     761                     self._redis_filter.add_a_value(kw['body'])  # 函数执行成功后,添加函数的参数排序后的键值对字符串到set中。
     762 
     763                 self.logger.debug(f'{self._concurrent_mode_dispatcher.get_concurrent_info()}  函数 {self.consuming_function.__name__}  '
     764                                   f'第{current_retry_times + 1}次 运行, 正确了,函数运行时间是 {round(time.time() - t_start, 4)} 秒,入参是 【 {kw["body"]} 】')
     765             except Exception as e:
     766                 if isinstance(e, (PyMongoError, ExceptionForRequeue)):  # mongo经常维护备份时候插入不了或挂了,或者自己主动抛出一个ExceptionForRequeue类型的错误会重新入队,不受指定重试次数逇约束。
     767                     self.logger.critical(f'函数 [{self.consuming_function.__name__}] 中发生错误 {type(e)}  {e}')
     768                     return self._requeue(kw)
     769                 self.logger.error(f'函数 {self.consuming_function.__name__}  第{current_retry_times + 1}次发生错误,'
     770                                   f'函数运行时间是 {round(time.time() - t_start, 4)} 秒,
      入参是 【 {kw["body"]} 】   
     原因是 {type(e)} {e} ', exc_info=self._is_print_detail_exception)
     771                 self._run_consuming_function_with_confirm_and_retry(kw, current_retry_times + 1)
     772         else:
     773             self.logger.critical(f'函数 {self.consuming_function.__name__} 达到最大重试次数 {self._max_retry_times} 后,仍然失败, 入参是 【 {kw["body"]} 】')  # 错得超过指定的次数了,就确认消费了。
     774             self._confirm_consume(kw)
     775 
     776     @abc.abstractmethod
     777     def _confirm_consume(self, kw):
     778         """确认消费"""
     779         raise NotImplementedError
     780 
     781         # noinspection PyUnusedLocal
     782 
     783     def check_heartbeat_and_message_count(self):
     784         self._msg_num_in_broker = self.publisher_of_same_queue.get_message_count()
     785         if time.time() - self._last_timestamp_print_msg_num > 60:
     786             self.logger.info(f'[{self._queue_name}] 队列中还有 [{self._msg_num_in_broker}] 个任务')
     787             self._last_timestamp_print_msg_num = time.time()
     788         if self._msg_num_in_broker != 0:
     789             self._last_timestamp_when_has_task_in_queue = time.time()
     790         return self._msg_num_in_broker
     791 
     792     @abc.abstractmethod
     793     def _requeue(self, kw):
     794         """重新入队"""
     795         raise NotImplementedError
     796 
     797     def _submit_task(self, kw):
     798         if self._judge_is_daylight():
     799             self._requeue(kw)
     800             time.sleep(self.time_interval_for_check_do_not_run_time)
     801             return
     802         if self._msg_expire_senconds != 0 and time.time() - self._msg_expire_senconds > kw['body']['publish_time']:
     803             self.logger.warning(f'消息发布时戳是 {kw["body"]["publish_time"]} {kw["body"].get("publish_time_format", "")},距离现在 {round(time.time() - kw["body"]["publish_time"], 4)} 秒 ,'
     804                                 f'超过了指定的 {self._msg_expire_senconds} 秒,丢弃任务')
     805             self._confirm_consume(kw)
     806             return 0
     807         self.threadpool.submit(self._run_consuming_function_with_confirm_and_retry, kw)
     808         time.sleep(self._msg_schedule_time_intercal)
     809 
     810     def _judge_is_daylight(self):
     811         if self._is_do_not_run_by_specify_time_effect and self._do_not_run_by_specify_time[0] < time_util.DatetimeConverter().time_str < self._do_not_run_by_specify_time[1]:
     812             self.logger.warning(f'现在时间是 {time_util.DatetimeConverter()} ,现在时间是在 {self._do_not_run_by_specify_time} 之间,不运行')
     813             return True
     814 
     815     def __str__(self):
     816         return f'队列为 {self.queue_name} 函数为 {self.consuming_function} 的消费者'
     817 
     818 
     819 # noinspection PyProtectedMember
     820 class ConcurrentModeDispatcher(LoggerMixin):
     821     schedulal_thread_to_be_join = []
     822     concurrent_mode = None
     823     schedual_task_always_use_thread = False
     824 
     825     def __init__(self, consumerx: AbstractConsumer):
     826         self.consumer = consumerx
     827         if self.__class__.concurrent_mode is not None and self.consumer._concurrent_mode != self.__class__.concurrent_mode:
     828             raise ValueError('同一解释器中不可以设置两种并发类型')
     829         self._concurrent_mode = self.__class__.concurrent_mode = self.consumer._concurrent_mode
     830         concurrent_name = ''
     831         self.timeout_deco = None
     832         if self._concurrent_mode == 1:
     833             concurrent_name = 'thread'
     834             self.timeout_deco = decorators.timeout
     835         elif self._concurrent_mode == 2:
     836             concurrent_name = 'gevent'
     837             self.timeout_deco = gevent_timeout_deco
     838         elif self._concurrent_mode == 3:
     839             concurrent_name = 'evenlet'
     840             self.timeout_deco = evenlet_timeout_deco
     841         self.concurrent_name = concurrent_name
     842         self.logger.warning(f'{self.consumer} 设置并发模式为 {self.concurrent_name}')
     843 
     844     def build_pool(self):
     845         if self.consumer._threadpool:
     846             return self.consumer._threadpool
     847 
     848         pool_type = None  # 是按照ThreadpoolExecutor写的三个鸭子类,公有方法名和功能写成完全一致,可以互相替换。
     849         if self._concurrent_mode == 1:
     850             pool_type = CustomThreadPoolExecutor
     851             check_not_monkey()
     852         elif self._concurrent_mode == 2:
     853             pool_type = GeventPoolExecutor
     854             check_gevent_monkey_patch()
     855         elif self._concurrent_mode == 3:
     856             pool_type = CustomEventletPoolExecutor
     857             check_evenlet_monkey_patch()
     858         self.consumer._threadpool = self.consumer._specify_threadpool if self.consumer._specify_threadpool else pool_type(self.consumer._threads_num + 1)  # 单独加一个检测消息数量和心跳的线程
     859         self.logger.warning(f'{self.concurrent_name} {self.consumer._threadpool}')
     860         return self.consumer._threadpool
     861 
     862     def schedulal_task_with_no_block(self):
     863         if self.schedual_task_always_use_thread:
     864             t = Thread(target=self.consumer.keep_circulating(1)(self.consumer._shedual_task))
     865             self.__class__.schedulal_thread_to_be_join.append(t)
     866             t.start()
     867         else:
     868             if self._concurrent_mode == 1:
     869                 t = Thread(target=self.consumer.keep_circulating(1)(self.consumer._shedual_task))
     870                 self.__class__.schedulal_thread_to_be_join.append(t)
     871                 t.start()
     872             elif self._concurrent_mode == 2:
     873                 g = gevent.spawn(self.consumer.keep_circulating(1)(self.consumer._shedual_task), )
     874                 self.__class__.schedulal_thread_to_be_join.append(g)
     875             elif self._concurrent_mode == 3:
     876                 g = eventlet.spawn(self.consumer.keep_circulating(1)(self.consumer._shedual_task), )
     877                 self.__class__.schedulal_thread_to_be_join.append(g)
     878         atexit.register(self.join)
     879 
     880     @classmethod
     881     def join(cls):
     882         nb_print((cls.schedulal_thread_to_be_join, len(cls.schedulal_thread_to_be_join), '模式:', cls.concurrent_mode))
     883         if cls.schedual_task_always_use_thread:
     884             for t in cls.schedulal_thread_to_be_join:
     885                 nb_print(t)
     886                 t.join()
     887         else:
     888             if cls.concurrent_mode == 1:
     889                 for t in cls.schedulal_thread_to_be_join:
     890                     nb_print(t)
     891                     t.join()
     892             elif cls.concurrent_mode == 2:
     893                 # cls.logger.info()
     894                 nb_print(cls.schedulal_thread_to_be_join)
     895                 gevent.joinall(cls.schedulal_thread_to_be_join, raise_error=True, )
     896             elif cls.concurrent_mode == 3:
     897                 for g in cls.schedulal_thread_to_be_join:
     898                     # eventlet.greenthread.GreenThread.
     899                     nb_print(g)
     900                     g.wait()
     901 
     902     def get_concurrent_info(self):
     903         concurrent_info = ''
     904         if self._concurrent_mode == 1:
     905             concurrent_info = f'[{threading.current_thread()}  {threading.active_count()}]'
     906         elif self._concurrent_mode == 2:
     907             concurrent_info = f'[{gevent.getcurrent()}  {threading.active_count()}]'
     908         elif self._concurrent_mode == 3:
     909             # noinspection PyArgumentList
     910             concurrent_info = f'[{eventlet.getcurrent()}  {threading.active_count()}]'
     911         return concurrent_info
     912 
     913 
     914 def wait_for_possible_has_finish_all_tasks(queue_name: str, minutes: int, send_stop_to_broker=0, broker_kind: int = 0, ):
     915     """
     916       由于是异步消费,和存在队列一边被消费,一边在推送,或者还有结尾少量任务还在确认消费者实际还没彻底运行完成。  但有时候需要判断 所有任务,务是否完成,提供一个不精确的判断,要搞清楚原因和场景后再慎用。
     917     :param queue_name: 队列名字
     918     :param minutes: 连续多少分钟没任务就判断为消费已完成
     919     :param send_stop_to_broker :发送停止标志到中间件,这回导致消费退出循环调度。
     920     :param broker_kind: 中间件种类
     921     :return:
     922     """
     923     if minutes <= 1:
     924         raise ValueError('疑似完成任务,判断时间最少需要设置为2分钟内,最好是是10分钟')
     925     pb = get_publisher(queue_name, broker_kind=broker_kind)
     926     no_task_time = 0
     927     while 1:
     928         # noinspection PyBroadException
     929         try:
     930             message_count = pb.get_message_count()
     931         except Exception as e:
     932             nb_print(e)
     933             message_count = -1
     934         if message_count == 0:
     935             no_task_time += 30
     936         else:
     937             no_task_time = 0
     938         time.sleep(30)
     939         if no_task_time > minutes * 60:
     940             break
     941     if send_stop_to_broker:
     942         pb.publish({'stop': 1})
     943     pb.close()
     944 
     945 
     946 class RabbitmqConsumer(AbstractConsumer):
     947     """
     948     使用pika包实现的。
     949     """
     950     BROKER_KIND = 0
     951 
     952     def _shedual_task(self):
     953         channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel()
     954         channel.queue_declare(queue=self._queue_name, durable=True)
     955         channel.basic_qos(prefetch_count=self._threads_num)
     956 
     957         def callback(ch, method, properties, body):
     958             body = body.decode()
     959             self.logger.debug(f'从rabbitmq的 [{self._queue_name}] 队列中 取出的消息是:  {body}')
     960             body = json.loads(body)
     961             kw = {'ch': ch, 'method': method, 'properties': properties, 'body': body}
     962             self._submit_task(kw)
     963 
     964         channel.basic_consume(callback,
     965                               queue=self._queue_name,
     966                               # no_ack=True
     967                               )
     968         channel.start_consuming()
     969 
     970     def _confirm_consume(self, kw):
     971         with self._lock_for_pika:
     972             try:
     973                 kw['ch'].basic_ack(delivery_tag=kw['method'].delivery_tag)  # 确认消费
     974             except pika.exceptions.AMQPError as e:
     975                 self.logger.error(f'pika确认消费失败  {e}')
     976 
     977     def _requeue(self, kw):
     978         with self._lock_for_pika:
     979             # ch.connection.add_callback_threadsafe(functools.partial(self.__ack_message_pika, ch, method.delivery_tag))
     980             return kw['ch'].basic_nack(delivery_tag=kw['method'].delivery_tag)  # 立即重新入队。
     981 
     982     @staticmethod
     983     def __ack_message_pika(channelx, delivery_tagx):
     984         """Note that `channel` must be the same pika channel instance via which
     985         the message being ACKed was retrieved (AMQP protocol constraint).
     986         """
     987         if channelx.is_open:
     988             channelx.basic_ack(delivery_tagx)
     989         else:
     990             # Channel is already closed, so we can't ACK this message;
     991             # log and/or do something that makes sense for your app in this case.
     992             pass
     993 
     994 
     995 class RabbitmqConsumerAmqpStorm(AbstractConsumer):
     996     """
     997     使用AmqpStorm实现的,多线程安全的,不用加锁。
     998     """
     999     BROKER_KIND = 4
    1000 
    1001     def _shedual_task(self):
    1002         # noinspection PyTypeChecker
    1003         def callback(amqpstorm_message: amqpstorm.Message):
    1004             body = amqpstorm_message.body
    1005             self.logger.debug(f'从rabbitmq的 [{self._queue_name}] 队列中 取出的消息是:  {body}')
    1006             body = json.loads(body)
    1007             kw = {'amqpstorm_message': amqpstorm_message, 'body': body}
    1008             self._submit_task(kw)
    1009 
    1010         rp = RabbitmqPublisherUsingAmqpStorm(self.queue_name)
    1011         rp.init_broker()
    1012         rp.channel_wrapper_by_ampqstormbaic.qos(self._threads_num)
    1013         rp.channel_wrapper_by_ampqstormbaic.consume(callback=callback, queue=self.queue_name, no_ack=False)
    1014         rp.channel.start_consuming(auto_decode=True)
    1015 
    1016     def _confirm_consume(self, kw):
    1017         # noinspection PyBroadException
    1018         try:
    1019             kw['amqpstorm_message'].ack()  # 确认消费
    1020         except Exception as e:
    1021             self.logger.error(f'AmqpStorm确认消费失败  {type(e)} {e}')
    1022 
    1023     def _requeue(self, kw):
    1024         kw['amqpstorm_message'].nack(requeue=True)
    1025 
    1026 
    1027 class RabbitmqConsumerRabbitpy(AbstractConsumer):
    1028     """
    1029     使用rabbitpy实现的
    1030     """
    1031     BROKER_KIND = 1
    1032 
    1033     def _shedual_task(self):
    1034         # noinspection PyTypeChecker
    1035         channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel()  # type:  rabbitpy.AMQP         #
    1036         channel.queue_declare(queue=self._queue_name, durable=True)
    1037         channel.basic_qos(prefetch_count=self._threads_num)
    1038         for message in channel.basic_consume(self._queue_name, no_ack=False):
    1039             body = message.body.decode()
    1040             self.logger.debug(f'从rabbitmq {self._queue_name} 队列中 取出的消息是:  {body}')
    1041             kw = {'message': message, 'body': json.loads(message.body.decode())}
    1042             self._submit_task(kw)
    1043 
    1044     def _confirm_consume(self, kw):
    1045         kw['message'].ack()
    1046 
    1047     def _requeue(self, kw):
    1048         kw['message'].nack(requeue=True)
    1049 
    1050 
    1051 class RedisConsumer(AbstractConsumer, RedisMixin):
    1052     """
    1053     redis作为中间件实现的。
    1054     """
    1055     BROKER_KIND = 2
    1056 
    1057     def _shedual_task(self):
    1058         while True:
    1059             t_start = time.time()
    1060             task_bytes = self.redis_db7.blpop(self._queue_name)[1]  # 使用db7
    1061             if task_bytes:
    1062                 self.logger.debug(f'取出的任务时间是 {round(time.time() - t_start, 4)}    消息是:  {task_bytes.decode()}  ')
    1063                 task_dict = json.loads(task_bytes)
    1064                 kw = {'body': task_dict}
    1065                 self._submit_task(kw)
    1066 
    1067     def _confirm_consume(self, kw):
    1068         pass  # redis没有确认消费的功能。
    1069 
    1070     def _requeue(self, kw):
    1071         self.redis_db7.rpush(self._queue_name, json.dumps(kw['body']))
    1072 
    1073 
    1074 class MongoMqConsumer(AbstractConsumer, MongoMixin):
    1075     """
    1076     Mongo queue包实现的基于mongo的消息队列,支持消费确认。
    1077     """
    1078     BROKER_KIND = 5
    1079 
    1080     def _shedual_task(self):
    1081         mp = MongoMqPublisher(self.queue_name)
    1082         while True:
    1083             t_start = time.time()
    1084             job = mp.queue.next()
    1085             if job is not None:
    1086                 self.logger.debug(f'取出的任务时间是 {round(time.time() - t_start, 4)}    消息是:  {job.payload}  ')
    1087                 kw = {'body': job.payload, 'job': job}
    1088                 self._submit_task(kw)
    1089             else:
    1090                 time.sleep(self._msg_schedule_time_intercal)
    1091 
    1092     def _confirm_consume(self, kw):
    1093         kw['job'].complete()
    1094 
    1095     def _requeue(self, kw):
    1096         kw['job'].release()
    1097 
    1098 
    1099 class PersistQueueConsumer(AbstractConsumer):
    1100     """
    1101     persist queue包实现的本地持久化消息队列。
    1102     """
    1103     BROKER_KIND = 6
    1104 
    1105     def _shedual_task(self):
    1106         pub = PersistQueuePublisher(self.queue_name)
    1107         while True:
    1108             t_start = time.time()
    1109             item = pub.queue.get()
    1110             self.logger.debug(f'取出的任务时间是 {round(time.time() - t_start, 4)}    消息是:  {item}  ')
    1111             kw = {'body': json.loads(item), 'q': pub.queue, 'item': item}
    1112             self._submit_task(kw)
    1113 
    1114     def _confirm_consume(self, kw):
    1115         kw['q'].ack(kw['item'])
    1116 
    1117     def _requeue(self, kw):
    1118         kw['q'].nack(kw['item'])
    1119 
    1120 
    1121 class LocalPythonQueueConsumer(AbstractConsumer):
    1122     BROKER_KIND = 3
    1123 
    1124     @property
    1125     def local_python_queue(self) -> Queue:
    1126         return local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name]
    1127 
    1128     def _shedual_task(self):
    1129         while True:
    1130             t_start = time.time()
    1131             task = self.local_python_queue.get()
    1132             if isinstance(task, str):
    1133                 task = json.loads(task)
    1134             self.logger.debug(f'取出的任务时间是 {round(time.time() - t_start, 4)}    消息是:  {json.dumps(task)}  ')
    1135             task_dict = task
    1136             kw = {'body': task_dict}
    1137             self._submit_task(kw)
    1138 
    1139     def _confirm_consume(self, kw):
    1140         pass
    1141 
    1142     def _requeue(self, kw):
    1143         self.local_python_queue.put(kw['body'])
    1144 
    1145 
    1146 def get_publisher(queue_name, *, log_level_int=10, logger_prefix='', is_add_file_handler=False, clear_queue_within_init=False, is_add_publish_time=False, broker_kind=0):
    1147     """
    1148     :param queue_name:
    1149     :param log_level_int:
    1150     :param logger_prefix:
    1151     :param is_add_file_handler:
    1152     :param clear_queue_within_init:
    1153     :param is_add_publish_time:是否添加发布时间到中间件,如果设置了过期时间不为0,需要设为True
    1154     :param broker_kind: 中间件或使用包的种类。
    1155     :return:
    1156     """
    1157     all_kwargs = copy.deepcopy(locals())
    1158     all_kwargs.pop('broker_kind')
    1159     if broker_kind == 0:
    1160         return RabbitmqPublisher(**all_kwargs)
    1161     elif broker_kind == 1:
    1162         return RabbitmqPublisherUsingRabbitpy(**all_kwargs)
    1163     elif broker_kind == 2:
    1164         return RedisPublisher(**all_kwargs)
    1165     elif broker_kind == 3:
    1166         return LocalPythonQueuePublisher(**all_kwargs)
    1167     elif broker_kind == 4:
    1168         return RabbitmqPublisherUsingAmqpStorm(**all_kwargs)
    1169     elif broker_kind == 5:
    1170         return MongoMqPublisher(**all_kwargs)
    1171     elif broker_kind == 6:
    1172         return PersistQueuePublisher(**all_kwargs)
    1173     else:
    1174         raise ValueError('设置的中间件种类数字不正确')
    1175 
    1176 
    1177 def get_consumer(queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool=None, concurrent_mode=1,
    1178                  max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, msg_expire_senconds=0,
    1179                  logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True,
    1180                  is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'),
    1181                  schedule_tasks_on_main_thread=False, broker_kind=0):
    1182     """
    1183     使用工厂模式再包一层,通过设置数字来生成基于不同中间件或包的consumer。
    1184     :param queue_name:
    1185     :param consuming_function: 处理消息的函数。
    1186     :param function_timeout : 超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。
    1187     :param threads_num:
    1188     :param specify_threadpool:使用指定的线程池,可以多个消费者共使用一个线程池,不为None时候。threads_num失效
    1189     :param concurrent_mode:并发模式,1线程 2gevent 3eventlet
    1190     :param max_retry_times:
    1191     :param log_level:
    1192     :param is_print_detail_exception:
    1193     :param msg_schedule_time_intercal:消息调度的时间间隔,用于控频
    1194     :param msg_expire_senconds:消息过期时间,为0永不过期,为10则代表,10秒之前发布的任务如果现在才轮到消费则丢弃任务。
    1195     :param logger_prefix: 日志前缀,可使不同的消费者生成不同的日志
    1196     :param create_logger_file : 是否创建文件日志
    1197     :param do_task_filtering :是否执行基于函数参数的任务过滤
    1198     :param is_consuming_function_use_multi_params  函数的参数是否是传统的多参数,不为单个body字典表示多个参数。
    1199     :param is_do_not_run_by_specify_time_effect :是否使不运行的时间段生效
    1200     :param do_not_run_by_specify_time   :不运行的时间段
    1201     :param schedule_tasks_on_main_thread :直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。
    1202     :param broker_kind:中间件种类,,不要设置为1。 0 使用pika链接mq,2使用redis,3使用python内置Queue
    1203     :return
    1204     """
    1205     all_kwargs = copy.copy(locals())
    1206     all_kwargs.pop('broker_kind')
    1207     if broker_kind == 0:
    1208         return RabbitmqConsumer(**all_kwargs)
    1209     elif broker_kind == 1:
    1210         return RabbitmqConsumerRabbitpy(**all_kwargs)
    1211     elif broker_kind == 2:
    1212         return RedisConsumer(**all_kwargs)
    1213     elif broker_kind == 3:
    1214         return LocalPythonQueueConsumer(**all_kwargs)
    1215     elif broker_kind == 4:
    1216         return RabbitmqConsumerAmqpStorm(**all_kwargs)
    1217     elif broker_kind == 5:
    1218         return MongoMqConsumer(**all_kwargs)
    1219     elif broker_kind == 6:
    1220         return PersistQueueConsumer(**all_kwargs)
    1221     else:
    1222         raise ValueError('设置的中间件种类数字不正确')
    1223 
    1224 
    1225 # noinspection PyMethodMayBeStatic,PyShadowingNames
    1226 class _Test(unittest.TestCase, LoggerMixin, RedisMixin):
    1227     """
    1228     演示一个简单求和的例子。
    1229     """
    1230 
    1231     @unittest.skip
    1232     def test_publisher_with(self):
    1233         """
    1234         测试上下文管理器。
    1235         :return:
    1236         """
    1237         with RabbitmqPublisher('queue_test') as rp:
    1238             for i in range(1000):
    1239                 rp.publish(str(i))
    1240 
    1241     @unittest.skip
    1242     def test_publish_rabbit(self):
    1243         """
    1244         测试mq推送
    1245         :return:
    1246         """
    1247         rabbitmq_publisher = RabbitmqPublisher('queue_test', log_level_int=10, logger_prefix='yy平台推送')
    1248         rabbitmq_publisher.clear()
    1249         for i in range(500000):
    1250             try:
    1251                 time.sleep(1)
    1252                 rabbitmq_publisher.publish({'a': i, 'b': 2 * i})
    1253             except Exception as e:
    1254                 print(e)
    1255 
    1256         rabbitmq_publisher = RabbitmqPublisher('queue_test2', log_level_int=20, logger_prefix='zz平台推送')
    1257         rabbitmq_publisher.clear()
    1258         [rabbitmq_publisher.publish({'somestr_to_be_print': str(i)}) for i in range(500000)]
    1259 
    1260     @unittest.skip
    1261     def test_publish_redis(self):
    1262         # 如果需要批量推送
    1263         for i in range(10007):
    1264             # 最犀利的批量操作方式,自动聚合多条redis命令,支持多种redis混合命令批量操作。
    1265             RedisBulkWriteHelper(self.redis_db7, 1000).add_task(RedisOperation('lpush', 'queue_test', json.dumps({'a': i, 'b': 2 * i})))
    1266         [self.redis_db7.lpush('queue_test', json.dumps({'a': j, 'b': 2 * j})) for j in range(500)]
    1267         print('推送完毕')
    1268 
    1269     @unittest.skip
    1270     def test_consume(self):
    1271         """
    1272         单参数代表所有传参
    1273         :return:
    1274         """
    1275 
    1276         def f(body):
    1277             self.logger.info(f'消费此消息 {body}')
    1278             # print(body['a'] + body['b'])
    1279             time.sleep(5)  # 模拟做某事需要阻塞10秒种,必须用并发。
    1280 
    1281         # 把消费的函数名传给consuming_function,就这么简单。
    1282         rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20, msg_schedule_time_intercal=0.5, log_level=10, logger_prefix='yy平台消费',
    1283                                              is_consuming_function_use_multi_params=False)
    1284         rabbitmq_consumer.start_consuming_message()
    1285 
    1286     @unittest.skip
    1287     def test_consume2(self):
    1288         """
    1289         测试支持传统参数形式,不是用一个字典里面包含所有参数。
    1290         :return:
    1291         """
    1292 
    1293         def f2(a, b):
    1294             self.logger.debug(f'a的值是 {a}')
    1295             self.logger.debug(f'b的值是 {b}')
    1296             print(f'{a} + {b} 的和是  {a + b}')
    1297             time.sleep(3)  # 模拟做某事需要阻塞10秒种,必须用并发。
    1298 
    1299         # 把消费的函数名传给consuming_function,就这么简单。
    1300         RabbitmqConsumer('queue_test', consuming_function=f2, threads_num=60, msg_schedule_time_intercal=5, log_level=10, logger_prefix='yy平台消费', is_consuming_function_use_multi_params=True).start_consuming_message()
    1301 
    1302     @unittest.skip
    1303     def test_redis_filter(self):
    1304         """
    1305         测试基于redis set结构的过滤器。
    1306         :return:
    1307         """
    1308         redis_filter = RedisFilter('abcd')
    1309         redis_filter.add_a_value({'a': 1, 'c': 3, 'b': 2})
    1310         redis_filter.check_value_exists({'a': 1, 'c': 3, 'b': 2})
    1311         redis_filter.check_value_exists({'a': 1, 'b': 2, 'c': 3})
    1312         with decorators.TimerContextManager():
    1313             print(redis_filter.check_value_exists('{"a": 1, "b": 2, "c": 3}'))
    1314         with decorators.TimerContextManager():
    1315             # 实测百万元素的set,过滤检查不需要1毫秒,一般最多100万个酒店。
    1316             print(RedisFilter('filter:mafengwo-detail_task').check_value_exists({"_id": "69873340"}))
    1317 
    1318     @unittest.skip
    1319     def test_run_two_function(self):
    1320         # 演示连续运行两个consumer
    1321         def f3(a, b):
    1322             print(f'{a} + {b} = {a + b}')
    1323             time.sleep(10)  # 模拟做某事需要阻塞10秒种,必须用并发。
    1324 
    1325         def f4(somestr_to_be_print):
    1326             print(f'打印 {somestr_to_be_print}')
    1327             time.sleep(20)  # 模拟做某事需要阻塞10秒种,必须用并发。
    1328 
    1329         RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平台消费', is_consuming_function_use_multi_params=True).start_consuming_message()
    1330         RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平台消费', is_consuming_function_use_multi_params=True).start_consuming_message()
    1331         # AbstractConsumer.join_shedual_task_thread()
    1332 
    1333     @unittest.skip
    1334     def test_local_python_queue_as_broker(self):
    1335         def f8(x, y):
    1336             nb_print((x, y))
    1337 
    1338         consumer = get_consumer('queue_testlll', consuming_function=f8, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='uu平台消费',
    1339                                 function_timeout=20, is_print_detail_exception=True, msg_expire_senconds=5, broker_kind=3)  # 通过设置broker_kind,一键切换中间件为mq或redis
    1340         get_publisher('queue_testlll', broker_kind=3, is_add_publish_time=True).publish({'x': 3, 'y': 4})
    1341         consumer.publisher_of_same_queue.set_is_add_publish_time(True).publish({'x': 1, 'y': 2})
    1342         nb_print(consumer.publisher_of_same_queue.get_message_count())
    1343         consumer.start_consuming_message()
    1344         for i in range(10000):
    1345             consumer.publisher_of_same_queue.publish({'x': i, 'y': i * 2})
    1346             time.sleep(2)
    1347 
    1348     # @unittest.skip
    1349     def test_factory_pattern_consumer(self):
    1350         """
    1351         测试工厂模式来生成消费者
    1352         :return:
    1353         """
    1354 
    1355         def f2(a, b):
    1356             # body_dict = json.loads(body)
    1357 
    1358             self.logger.info(f'消费此消息 {a}  {b} ,结果是  {a + b}')
    1359             # print(body_dict['a'] + body_dict['b'])
    1360             time.sleep(30)  # 模拟做某事需要阻塞10秒种,必须用并发。
    1361             # 把消费的函数名传给consuming_function,就这么简单。
    1362 
    1363         consumer = get_consumer('queue_test5', consuming_function=f2, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='zz平台消费',
    1364                                 function_timeout=20, is_print_detail_exception=True, msg_expire_senconds=500, broker_kind=0)  # 通过设置broker_kind,一键切换中间件为mq或redis
    1365         consumer.publisher_of_same_queue.clear()
    1366         [consumer.publisher_of_same_queue.publish({'a': i, 'b': 2 * i}) for i in range(1)]
    1367         time.sleep(10)  # sleep测试消息过期。
    1368         get_publisher('queue_test5', broker_kind=0).set_is_add_publish_time().publish({'a': 1000, 'b': 2000})
    1369         consumer.start_consuming_message()
    1370         # consumer.join_shedual_task_thread()
    1371         # block_python_exit.just_block_python_exit()
    1372         # show_current_threads_num(block=True)
    1373 
    1374 
    1375 if __name__ == '__main__':
    1376     # noinspection PyArgumentList
    1377     unittest.main(sleep_time=1)

     gevent 并发模式:

     

    本地持久化队列,使用sqlite3模拟消息队列的图片。


    mongodb模拟的消息队列

    
    
  • 相关阅读:
    HDU1266 Reverse Number
    codevs1380 没有上司的舞会
    codevs1163 访问艺术馆
    codevs2144 砝码称重 2
    codevs1553 互斥的数
    codevs1230 元素查找
    codevs3118 高精度练习之除法
    codevs1245 最小的N个和
    codevs1063 合并果子
    codevs1052 地鼠游戏
  • 原文地址:https://www.cnblogs.com/ydf0509/p/11139471.html
Copyright © 2011-2022 走看看