zoukankan      html  css  js  c++  java
  • scrapy_redis 相关: 多线程更新 score/request.priority

    0.背景

    使用 scrapy_redis 爬虫, 忘记或错误设置 request.priority(Rule 也可以通过参数 process_request 设置 request.priority),导致提取 item 的 request 排在有序集 xxx:requests 的队尾,持续占用内存。

    1.代码实现

    遍历 SortedSet 的所有 item 并根据预定义字典对 data 中的 url 进行正则匹配,更新 score 并复制到临时 newkey,最后执行 rename 

    # -*- coding: UTF-8 -*
    import sys
    import re
    from multiprocessing.dummy import Pool as ThreadPool
    from functools import partial
    
    try:
       input = raw_input #For py2
    except NameError:
       pass
    
    import redis
    
    
    def print_line(string):
        print('
    {symbol}{space}{string}'.format(symbol='#'*10, space=' '*5, string=string))
    
    
    def check_key_scores(key):
        try:
            total = redis_server.zcard(key)
        except redis.exceptions.ResponseError:
            print("The value of '{key}' is not a SortedSet".format(key=key))
            sys.exit()
        except Exception as err:
            print(err)
            sys.exit()
    
        if total == 0:
            print("key '{key}' does not exist or has no items".format(key=key))
            sys.exit()
    
        __, min_score = redis_server.zrange(key, 0, 0, withscores=True)[0]
        __, max_score = redis_server.zrange(key, -1, -1, withscores=True)[0]
    
        print('score  amount')
        total_ = 0
        # Asuming that score/request.priority is an integer, rather than float number like 1.1
        for score in range(int(min_score), int(max_score)+1):
            count = redis_server.zcount(key, score, score)
            print(score, count)
            total_ += count
        print("{total_}/{total} items of key '{key}' have an integer priority".format(
                total_=total_, total=total_, key=key))
    
    
    def zadd_with_new_score(startstop, total_items):
        data, ori_score = redis_server.zrange(key, startstop, startstop, withscores=True)[0]
        for pattern, score in pattern_score:
            # data eg: b'\x80\x02}q\x00(X\x03\x00\x00\x00urlq\x01X\x13\x00\x00\x00http://httpbin.org/q\x02X\x08\x00\x00\x00callbackq\x03X\x
            # See /site-packages/scrapy_redis/queue.py
                # We don't use zadd method as the order of arguments change depending on
                # whether the class is Redis or StrictRedis, and the option of using
                # kwargs only accepts strings, not bytes.
            m = pattern.search(data.decode('utf-8', 'replace'))
            if m:
                redis_server.execute_command('ZADD', newkey, score, data)
                break
        else:
            redis_server.execute_command('ZADD', newkey, ori_score, data)
        print('{startstop} / {total_items}'.format(
                startstop=startstop+1, total_items=total_items))
    
    
    if __name__ == '__main__':
    
        password = 'password'
        host = '127.0.0.1'
        port = '6379'
        database_num = 0
    
        key = 'test:requests'
        newkey = 'temp'
        # Request whose url matching any key of keyword_score would be updated with the corresponding value as its score
        # Smaller value/score means higher request.priority
        keyword_score = {'httpbin': -12, 'apps/details': 1}
        pattern_score = [(re.compile(r'url.*?%s.*?callback'%k), v)for (k, v) in keyword_score.items()]
        
        threads_amount = 10
    
        redis_server = redis.StrictRedis.from_url('redis://:{password}@{host}:{port}/{database_num}'.format(
                                                    password=password, host=host,
                                                    port=port, database_num=database_num))
    
    
        print_line('Step 0: pre check')
        check_key_scores(key)
    
    
        print_line('Step 1: copy items and update score')
        # total_items = redis_server.zlexcount(key, '-', '+')
        total_items = redis_server.zcard(key)
        input("Press Enter to copy {total_items} items of '{key}' into '{newkey}' with new score".format(
                total_items=total_items, key=key, newkey=newkey))
        p = ThreadPool(threads_amount)
        p.map(partial(zadd_with_new_score, total_items=total_items), range(total_items))
        p.close()   #Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
        p.join()    #Wait for the worker processes to exit. One must call close() or terminate() before using join().
    
        # For py3
        # https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments
        # with ThreadPool(threads_amount) as pool:
            # pool.map(partial(zadd_with_new_score, total_items=total_items), range(total_items))
        # print('zadd_with_new_score done')
    
    
        print_line('Step 2: check copy result')
        check_key_scores(key)
        check_key_scores(newkey)
    
    
        print_line('Step 3: delete, rename and check key')
        input("Press Enter to DELETE '{key}' and RENAME '{newkey}' to '{key}'".format(
                key=key, newkey=newkey))
        print(redis_server.delete(key))
        print(redis_server.rename(newkey, key))
        check_key_scores(key)
        check_key_scores(newkey)

    2.运行结果

  • 相关阅读:
    numpy用法介绍-未完待续
    GeoJSON相关操作
    awk日志分析
    awk获取外部变量
    Shell编程二
    Shell编程
    Linux监控平台搭建
    Linux集群架构
    Linux集群
    MySQL主从(MySQL proxy Lua读写分离设置,一主多从同步配置,分库分表方案)
  • 原文地址:https://www.cnblogs.com/my8100/p/scrapy_redis_update_score.html
Copyright © 2011-2022 走看看