zoukankan      html  css  js  c++  java
  • 使用redis原生list结构作为消息队列取代celery框架。

    1、web后台对大批量的繁重的io任务需要解耦使用分布式异步技术,否则会使接口阻塞,并发延迟,一般就选celery好了。此篇的取代主要是针对取代celery的worker模式。没有涉及到周期和定时模式。

     2、对我来说celery提供了  分布式,任务路由,超时杀死,任务过期丢弃,任务限速,并发模型选择,并发池大小这些功能。

    3、此篇除了并发模型固定为了线程模式,其余的特点都实现了。基本上的代码复用了之前使用celery框架的代码,只有任务调度变了,所以从celery改为自定义只花了3小时就改过来了。

    4、具体是先实现基本骨架,然后使用23种设计模式中的模板模式继承基类,实现其中一个方法。也就是原来被celery 的@app.task装饰的东西,现在改为了继承和重写基类方法。

     5、如果需要使用celery的进程工作模式,可以在import之后加一行ThreadPoolExcutor = ProcessPoolExecutor,就能很容易换成进程模式了。

    如果需要使用celery的gevent工作模式,可以import gevent ,然后monkey.patch_all()

    # -*- coding: utf-8 -*-
    # @Author  : ydf
    """
    用来取代celery框架的,
    改为使用自定义架构
    """
    import typing
    import abc
    import threading
    from multiprocessing import Process
    import json
    import time
    
    from app.utils_ydf import BoundedThreadPoolExecutor, RedisMixin, decorators, LoggerMixin, LogManager
    from app.apis.list_page_live_price.live_price_celery_app import live_price_deco, bulk_price_live_deco
    from app.constant import icon_list
    
    
    from app.apis.cnbooking.cnbooking_core import CnbookingHotelPriceQuerier, CnbookingHotelPriceQuerierInternational
    from app.apis.daolv.hotel_detail import query_hoteldetail_price
    # from app.apis.elong.elong_detail_priceinfo2 import detail_priceinfo
    from app.apis.elongin.elong_in_detail import elong
    from app.apis.haoqiao.core import search2
    from app.apis.jltour.jl_price import get_jl_tour_price
    from app.apis.qunar.core import getPrice_in, getPrice
    from app.apis.yingli.core import get_detial
    
    
    # from app.apis.expedia.expedia_hotel_price import get_expedia_price
    from app.apis.ctrip.ctriphotelm import ctripPriceIn, ctripPrice
    
    # 导入批量获取比价的函数
    from app.apis.elong.elong_cn_bulk_request import elong_cn_bulk_request_price
    from app.apis.jltour.jl_bulk_price_querier import JltourBulkPriceQuerier
    from app.apis.daolv.daolv_bulk_price_querier import DaolvBulkPriceQuerier
    
    QUENEN_NAME_ELONG = 'compare.quenen.elong'
    QUENEN_NAME_QUNAR = 'compare.quenen.qunar'
    QUENEN_NAME_DAOLV = 'compare.quenen.daolv'
    QUENEN_NAME_HAOQIAO = 'compare.quenen.haoqiao'
    QUENEN_NAME_CNBOOKING = 'compare.quenen.cnbooking'
    QUENEN_NAME_PROFIT = 'compare.quenen.profit'
    QUENEN_NAME_JLTOUR = 'compare.quenen.jltour'
    QUENEN_NAME_CTRIP = 'compare.quenen.ctrip'
    QUENEN_NAME_ELONG_CN = 'compare.quenen.elong_cn'
    
    TASK_EXPIRE_TIME = 15  # 任务过期时间,消费时候比提交任务时候晚了15秒则不执行这个任务
    TASK_TIMEOUT = 20  # 任务(函数)运行超时,自动杀死的时间配置
    
    logger_redis = LogManager('logger_redis').get_logger_and_add_handlers(5, is_add_stream_handler=False, log_filename='logger_redis.log')
    
    
    class BaseExecuor(RedisMixin, LoggerMixin):
        """
        单个酒店查询的基类
        """
    
        def __init__(self, redis_list_key_name, thread_pool_nums, every_request_interval_time, platfrom_name):
            """
    
            :param redis_list_key_name: 每个平台的redis任务键
            :param thread_pool_nums: 线程池最大数量
            :param every_request_interval_time: 每隔多少秒方任务到线程池,用于限制频率
            :param platfrom_name: 平台名字
            """
            self._redis_list_key_name = redis_list_key_name
            self._thread_pool_nums = thread_pool_nums
            self._every_request_interval_time = every_request_interval_time
            self._platfrom_name = platfrom_name
            self._pool = BoundedThreadPoolExecutor(self._thread_pool_nums)
            self._t0 = time.time()
            self._count_per_second = 0
            self._lock = threading.Lock()
            self.logger_with_file.debug(f'监听的队列是 {self._redis_list_key_name}')
    
        def _shedul_a_task(self, redis_task: str):
            hotel_map_item, arrival_date, departure_date, adults, children_str, timestamp = redis_task.split('@@')
            hotel_map_item = json.loads(hotel_map_item)
            adults = int(adults)
            children_str = '' if children_str in (0, '0') else children_str  # 空的会出现4个@符号在一起,split出错
            if time.time() - float(timestamp) < TASK_EXPIRE_TIME:
                self.logger_with_file.debug(f'未过期,执行这个任务 {redis_task} ')
                time.sleep(self._every_request_interval_time)
                lowest_price_key = 'lowestprice_' + hotel_map_item['_id'] + '_' + arrival_date + '_' + departure_date + '_' + str(adults) + '_' + str(children_str)
                if not self.redis_db_hotel.exists(lowest_price_key):  # TODO 如果此马踏飞燕id不存在最低价则请求
                    self._pool.submit(self.execute_specific_task, hotel_map_item, arrival_date, departure_date, adults, children_str)
                else:
                    self.logger_with_file.warning(f'此马踏飞燕酒店 {hotel_map_item["_id"]} 已经有最低价了,此次不请求 {self._platfrom_name} 这个平台')
            else:
                self.logger_with_file.warning(f'时间超过 {TASK_EXPIRE_TIME},放弃这个任务 {redis_task}')
    
        def start(self):
            while True:
                try:
                    time_redis_0 = time.time()
                    redis_task_bytes = self.redis_db_hotel.rpop(self._redis_list_key_name)  # 得到一个键hotel_map_item,arrival_date, departure_date, adults, children_str,timestamp
                    if redis_task_bytes:
                        redis_task = redis_task_bytes.decode('utf8')
                        self.logger_with_file.debug(f'从 {self._redis_list_key_name} 键取出的内容是-->  {redis_task}  redis取出耗时 {time.time() - time_redis_0}')
                        self._shedul_a_task(redis_task)
                    else:
                        if time.time() - self._t0 > 5:  # 为了不频繁写这个日志主要是
                            self._t0 = time.time()
                            self.logger.debug(f'平台  {self._platfrom_name}  {self._redis_list_key_name} 队列中没有任务, redis耗时 {time.time() - time_redis_0}')
                        time.sleep(self._every_request_interval_time)
                except Exception as e:
                    self.logger_with_file.exception(e)
                    time.sleep(self._every_request_interval_time)
    
        @abc.abstractmethod
        def execute_specific_task(self, hotel_map_item_or_list: typing.Union[dict, list], arrival_date__, departure_date__, adults__, children_str__):
            raise NotImplemented
    
    
    class BaseBulkExcutor(BaseExecuor):
        """批量查询的基类"""
    
        def execute_specific_task(self, hotel_map_item_or_list: typing.Union[dict, list], arrival_date__, departure_date__, adults__, children_str__):
            pass
    
        def _shedul_a_task(self, redis_task: str):
            redis_task = json.loads(redis_task)
            hotel_map_item_list = redis_task['id_list']
            arrival_date, departure_date, adults, children_str, timestamp = redis_task['arrival_date'], redis_task['departure_date'], redis_task['adults'], redis_task['children_str'], redis_task['timestamp']
            adults = int(adults)
            children_str = '' if children_str in (0, '0') else children_str  # 空的会出现4个@符号在一起,split出错,用了0代替空字符串
            if time.time() - float(timestamp) < TASK_EXPIRE_TIME:
                self.logger_with_file.debug(f'未过期,执行这个任务 {redis_task} ')
                time.sleep(self._every_request_interval_time)
                self._pool.submit(self.execute_specific_task, hotel_map_item_list, arrival_date, departure_date, adults, children_str)
            else:
                self.logger_with_file.warning(f'时间超过 {TASK_EXPIRE_TIME},放弃这个任务 {redis_task}')
    
    
    class QunarExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('qunar', icon_list.ICON_QUNAR)
            def qunar_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                if not hotel_map_item['_id'].startswith('IN'):
                    return getPrice(hotel_map_item['qunar_id'], arrival_date, departure_date)
                else:
                    if children_str:
                        qunar_children_age = children_str.replace(",", "|")
                        qunar_children = len(children_str.split(","))
                    else:
                        qunar_children_age = ''
                        qunar_children = 0
                    return getPrice_in(hotel_map_item['qunar_id'], arrival_date, departure_date, adults, qunar_children, qunar_children_age)
    
            qunar_live(*args, **kwargs)
    
    
    class CnbookingExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('cnbooking', icon_list.ICON_LONGTENG)
            def cnbooking_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                if not hotel_map_item['_id'].startswith('IN'):
                    return CnbookingHotelPriceQuerier(hotel_map_item['cnbooking_id'], arrival_date, departure_date, adults, children_str).get_result()
                else:
                    return CnbookingHotelPriceQuerierInternational(hotel_map_item['cnbooking_id'], arrival_date, departure_date, adults, children_str).get_result()
    
            cnbooking_live(*args, **kwargs)
    
    
    class ElongExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('elong', icon_list.ICON_MASHANGZHU)
            def elong_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                if not hotel_map_item['_id'].startswith('IN'):
                    pass
                    # return detail_priceinfo(arrival_date, departure_date, hotel_map_item['elong_id'])
                else:
                    return elong(arrival_date, departure_date, hotel_map_item['elong_id'], adults, children_str)
    
            elong_live(*args, **kwargs)
    
    
    class DaolvExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('daolv', icon_list.ICON_DAOLV)
            def daolv_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                """不需要区分国内外"""
                return query_hoteldetail_price(hotel_map_item['daolv_id'], arrival_date, departure_date, adults, children_str)
    
            daolv_live(*args, **kwargs)
    
    
    class JltourExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            # noinspection PyUnusedLocal
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('jltour', icon_list.ICON_JLTOUR)
            def jltour_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                """不需要区分国内外"""
                return get_jl_tour_price(hotel_map_item['jltour_id'], arrival_date, departure_date, adults)
    
            jltour_live(*args, **kwargs)
    
    
    class HaoqiaoExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('haoqiao', icon_list.ICON_HQ)
            def haoqiao_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                """不需要区分国内外"""
                haoqiao = hotel_map_item['haoqiao_id']
                hotel_id = haoqiao['hotel_id']
                city_id = haoqiao['city_id']
                return search2(hotel_id, city_id, arrival_date, departure_date, children_str, adults)
    
            haoqiao_live(*args, **kwargs)
    
    
    class YingliExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            # noinspection PyUnusedLocal,PyUnusedLocal
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('yingli', icon_list.ICON_JUYOUHUI)
            def yingli_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                """不需要区分国内外"""
                return get_detial(hotel_map_item['yingli_id'], arrival_date, departure_date)
    
            yingli_live(*args, **kwargs)
    
    
    class CtripExecutor(BaseExecuor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @live_price_deco('ctrip', icon_list.ICON_CTRIP)
            def ctrip_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
                if hotel_map_item['_id'].startswith('IN'):
                    return ctripPriceIn(hotel_map_item['ctrip_id'], arrival_date, departure_date, adults, children_str)
                else:
                    return ctripPrice(hotel_map_item['ctrip_id'], arrival_date, departure_date)
    
            ctrip_live(*args, **kwargs)
    
    
    # ###########################################################批量查询######################################################################################
    class JltourBulkExecutor(BaseBulkExcutor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @bulk_price_live_deco(platform_name='jltour', platform_icon=icon_list.ICON_JLTOUR, platform_hotel_id_key='jltour_id')
            def jltour_bulk_request_price_live(hotel_map_item_list, arrival_date, departure_date, adults, children_str):
                hotel_id_list = [hotel_map_item['jltour_id'] for hotel_map_item in hotel_map_item_list]
                return JltourBulkPriceQuerier(hotel_id_list, arrival_date, departure_date, adults, children_str).get_result_list()
    
            jltour_bulk_request_price_live(*args, **kwargs)
    
    
    class ElongBulkExecutor(BaseBulkExcutor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @bulk_price_live_deco(platform_name='elong', platform_icon=icon_list.ICON_MASHANGZHU, platform_hotel_id_key='elong_id')
            def elong_cn_bulk_request_price_live(hotel_map_item_list, arrival_date, departure_date, adults, children_str):
                hotel_id_list = [hotel_map_item['elong_id'] for hotel_map_item in hotel_map_item_list]
                price_result_list = elong_cn_bulk_request_price(hotel_id_list, arrival_date, departure_date, adults, children_str)
                return price_result_list
    
            elong_cn_bulk_request_price_live(*args, **kwargs)
    
    
    class DaolvBulkExecutor(JltourBulkExecutor):
        def execute_specific_task(self, *args, **kwargs):
            @decorators.timeout(TASK_TIMEOUT)
            @bulk_price_live_deco(platform_name='daolv', platform_icon=icon_list.ICON_DAOLV, platform_hotel_id_key='daolv_id')
            def daolv_bulk_request_price_live(hotel_map_item_list, arrival_date, departure_date, adults, children_str):
                hotel_id_list = [hotel_map_item['daolv_id'] for hotel_map_item in hotel_map_item_list]
                querier = DaolvBulkPriceQuerier(hotel_id_list, arrival_date, departure_date, adults, children_str)
                querier.set_is_real_time(is_real_time=False)
                return querier.get_result_list()
    
            daolv_bulk_request_price_live(*args, **kwargs)
    
    
    def start_executor(**kwargs):
        platfrom_name = kwargs['platfrom_name']
        if platfrom_name == '去哪':
            executor_class = QunarExecutor
        elif platfrom_name == '龙腾':
            executor_class = CnbookingExecutor
        elif platfrom_name == '艺龙国际':
            executor_class = ElongExecutor
        elif platfrom_name == '道旅':
            executor_class = DaolvBulkExecutor
        elif platfrom_name == '捷旅':
            executor_class = JltourBulkExecutor
        elif platfrom_name == '好巧':
            executor_class = HaoqiaoExecutor
        elif platfrom_name == '盈利':
            executor_class = YingliExecutor
        elif platfrom_name == '携程':
            executor_class = CtripExecutor
            CtripExecutor(**kwargs).start()
        elif platfrom_name == '艺龙国内':
            executor_class = ElongBulkExecutor
        else:
            raise ValueError('平台名字设置不正确')
        executor_class(**kwargs).start()
    
    
    if __name__ == '__main__':
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_QUNAR, 'thread_pool_nums': 300, 'every_request_interval_time': 0.02, 'platfrom_name': '去哪'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_CNBOOKING, 'thread_pool_nums': 300, 'every_request_interval_time': 0.02, 'platfrom_name': '龙腾'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_ELONG, 'thread_pool_nums': 300, 'every_request_interval_time': 0.15, 'platfrom_name': '艺龙国际'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_DAOLV, 'thread_pool_nums': 300, 'every_request_interval_time': 0.02, 'platfrom_name': '道旅'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_JLTOUR, 'thread_pool_nums': 300, 'every_request_interval_time': 0.1, 'platfrom_name': '捷旅'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_HAOQIAO, 'thread_pool_nums': 100, 'every_request_interval_time': 0.5, 'platfrom_name': '好巧'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_PROFIT, 'thread_pool_nums': 300, 'every_request_interval_time': 0.01, 'platfrom_name': '盈利'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_CTRIP, 'thread_pool_nums': 500, 'every_request_interval_time': 0.01, 'platfrom_name': '携程'}).start()
        Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_ELONG_CN, 'thread_pool_nums': 200, 'every_request_interval_time': 0.15, 'platfrom_name': '艺龙国内'}).start()
  • 相关阅读:
    2015 11月30日 一周工作计划与执行
    2015 11月23日 一周工作计划与执行
    js 时间加减
    unix高级编程阅读
    2015 11月16日 一周工作计划与执行
    2015 11月9日 一周工作计划与执行
    python2与python3差异,以及如何写两者兼容代码
    property属性
    js刷新页面函数 location.reload()
    常用表单验证
  • 原文地址:https://www.cnblogs.com/ydf0509/p/9856028.html
Copyright © 2011-2022 走看看