执行时,报出问题:SyntaxError: invalid syntax
第一步:改变量名
- async从 python3.7 开始已经加入保留关键字中,所以async不能作为函数的参数名.
- 修改/Library/anaconda3/lib/python3.7/site-packages/pyspider路径下的python文件中的async为async_mode(其他名也可以)替换好的代码在下方。
-
- 1.run.py (直接全部替换,async为async_mode)
-
#!/usr/bin/env python # -*- encoding: utf-8 -*- # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: # Author: Binux<i@binux.me> # http://binux.me # Created on 2014-03-05 00:11:49 import os import sys import six import copy import time import shutil import logging import logging.config import click import pyspider from pyspider.message_queue import connect_message_queue from pyspider.database import connect_database from pyspider.libs import utils def read_config(ctx, param, value): if not value: return {} import json def underline_dict(d): if not isinstance(d, dict): return d return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d)) config = underline_dict(json.load(value)) ctx.default_map = config return config def connect_db(ctx, param, value): if not value: return return utils.Get(lambda: connect_database(value)) def load_cls(ctx, param, value): if isinstance(value, six.string_types): return utils.load_object(value) return value def connect_rpc(ctx, param, value): if not value: return try: from six.moves import xmlrpc_client except ImportError: import xmlrpclib as xmlrpc_client return xmlrpc_client.ServerProxy(value, allow_none=True) @click.group(invoke_without_command=True) @click.option('-c', '--config', callback=read_config, type=click.File('r'), help='a json file with default values for subcommands. {"webui": {"port":5001}}') @click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"), help="logging config file for built-in python logging module", show_default=True) @click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode') @click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100, help='maxsize of queue') @click.option('--taskdb', envvar='TASKDB', callback=connect_db, help='database url for taskdb, default: sqlite') @click.option('--projectdb', envvar='PROJECTDB', callback=connect_db, help='database url for projectdb, default: sqlite') @click.option('--resultdb', envvar='RESULTDB', callback=connect_db, help='database url for resultdb, default: sqlite') @click.option('--message-queue', envvar='AMQP_URL', help='connection url to message queue, ' 'default: builtin multiprocessing.Queue') @click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. ' 'please use --message-queue instead.') @click.option('--beanstalk', envvar='BEANSTALK_HOST', help='[deprecated] beanstalk config for beanstalk queue. ' 'please use --message-queue instead.') @click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port") @click.option('--data-path', default='./data', help='data dir path') @click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True, help='add current working directory to python lib search path') @click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__) @click.pass_context def cli(ctx, **kwargs): """ A powerful spider system in python. """ if kwargs['add_sys_path']: sys.path.append(os.getcwd()) logging.config.fileConfig(kwargs['logging_config']) # get db from env for db in ('taskdb', 'projectdb', 'resultdb'): if kwargs[db] is not None: continue if os.environ.get('MYSQL_NAME'): kwargs[db] = utils.Get(lambda db=db: connect_database( 'sqlalchemy+mysql+%s://%s:%s/%s' % ( db, os.environ['MYSQL_PORT_3306_TCP_ADDR'], os.environ['MYSQL_PORT_3306_TCP_PORT'], db))) elif os.environ.get('MONGODB_NAME'): kwargs[db] = utils.Get(lambda db=db: connect_database( 'mongodb+%s://%s:%s/%s' % ( db, os.environ['MONGODB_PORT_27017_TCP_ADDR'], os.environ['MONGODB_PORT_27017_TCP_PORT'], db))) elif ctx.invoked_subcommand == 'bench': if kwargs['data_path'] == './data': kwargs['data_path'] += '/bench' shutil.rmtree(kwargs['data_path'], ignore_errors=True) os.mkdir(kwargs['data_path']) if db in ('taskdb', 'resultdb'): kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db))) elif db in ('projectdb', ): kwargs[db] = utils.Get(lambda db=db: connect_database('local+%s://%s' % ( db, os.path.join(os.path.dirname(__file__), 'libs/bench.py')))) else: if not os.path.exists(kwargs['data_path']): os.mkdir(kwargs['data_path']) kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % ( db, kwargs['data_path'], db[:-2]))) kwargs['is_%s_default' % db] = True # create folder for counter.dump if not os.path.exists(kwargs['data_path']): os.mkdir(kwargs['data_path']) # message queue, compatible with old version if kwargs.get('message_queue'): pass elif kwargs.get('amqp_url'): kwargs['message_queue'] = kwargs['amqp_url'] elif os.environ.get('RABBITMQ_NAME'): kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s" ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ) elif kwargs.get('beanstalk'): kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk'] for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher', 'fetcher2processor', 'processor2result'): if kwargs.get('message_queue'): kwargs[name] = utils.Get(lambda name=name: connect_message_queue( name, kwargs.get('message_queue'), kwargs['queue_maxsize'])) else: kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'), kwargs['queue_maxsize']) # phantomjs-proxy if kwargs.get('phantomjs_proxy'): pass elif os.environ.get('PHANTOMJS_NAME'): kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):] ctx.obj = utils.ObjectDict(ctx.obj or {}) ctx.obj['instances'] = [] ctx.obj.update(kwargs) if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'): ctx.invoke(all) return ctx @cli.command() @click.option('--xmlrpc/--no-xmlrpc', default=True) @click.option('--xmlrpc-host', default='0.0.0.0') @click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333) @click.option('--inqueue-limit', default=0, help='size limit of task queue for each project, ' 'tasks will been ignored when overflow') @click.option('--delete-time', default=24 * 60 * 60, help='delete time before marked as delete') @click.option('--active-tasks', default=100, help='active log size') @click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop') @click.option('--fail-pause-num', default=10, help='auto pause the project when last FAIL_PAUSE_NUM task failed, set 0 to disable') @click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls, help='scheduler class to be used.') @click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4') @click.pass_context def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, inqueue_limit, delete_time, active_tasks, loop_limit, fail_pause_num, scheduler_cls, threads, get_object=False): """ Run Scheduler, only one scheduler is allowed. """ g = ctx.obj Scheduler = load_cls(None, None, scheduler_cls) kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb, newtask_queue=g.newtask_queue, status_queue=g.status_queue, out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data')) if threads: kwargs['threads'] = int(threads) scheduler = Scheduler(**kwargs) scheduler.INQUEUE_LIMIT = inqueue_limit scheduler.DELETE_TIME = delete_time scheduler.ACTIVE_TASKS = active_tasks scheduler.LOOP_LIMIT = loop_limit scheduler.FAIL_PAUSE_NUM = fail_pause_num g.instances.append(scheduler) if g.get('testing_mode') or get_object: return scheduler if xmlrpc: utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host) scheduler.run() @cli.command() @click.option('--xmlrpc/--no-xmlrpc', default=False) @click.option('--xmlrpc-host', default='0.0.0.0') @click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444) @click.option('--poolsize', default=100, help="max simultaneous fetches") @click.option('--proxy', help="proxy host:port") @click.option('--user-agent', help='user agent') @click.option('--timeout', help='default fetch timeout') @click.option('--phantomjs-endpoint', help="endpoint of phantomjs, start via pyspider phantomjs") @click.option('--splash-endpoint', help="execute endpoint of splash: http://splash.readthedocs.io/en/stable/api.html#execute") @click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls, help='Fetcher class to be used.') @click.pass_context def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent, timeout, phantomjs_endpoint, splash_endpoint, fetcher_cls, async_mode=True, get_object=False, no_input=False): """ Run Fetcher. """ g = ctx.obj Fetcher = load_cls(None, None, fetcher_cls) if no_input: inqueue = None outqueue = None else: inqueue = g.scheduler2fetcher outqueue = g.fetcher2processor fetcher = Fetcher(inqueue=inqueue, outqueue=outqueue, poolsize=poolsize, proxy=proxy, async_mode=async_mode) fetcher.phantomjs_proxy = phantomjs_endpoint or g.phantomjs_proxy fetcher.splash_endpoint = splash_endpoint if user_agent: fetcher.user_agent = user_agent if timeout: fetcher.default_options = copy.deepcopy(fetcher.default_options) fetcher.default_options['timeout'] = timeout g.instances.append(fetcher) if g.get('testing_mode') or get_object: return fetcher if xmlrpc: utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host) fetcher.run() @cli.command() @click.option('--processor-cls', default='pyspider.processor.Processor', callback=load_cls, help='Processor class to be used.') @click.option('--process-time-limit', default=30, help='script process time limit') @click.pass_context def processor(ctx, processor_cls, process_time_limit, enable_stdout_capture=True, get_object=False): """ Run Processor. """ g = ctx.obj Processor = load_cls(None, None, processor_cls) processor = Processor(projectdb=g.projectdb, inqueue=g.fetcher2processor, status_queue=g.status_queue, newtask_queue=g.newtask_queue, result_queue=g.processor2result, enable_stdout_capture=enable_stdout_capture, process_time_limit=process_time_limit) g.instances.append(processor) if g.get('testing_mode') or get_object: return processor processor.run() @cli.command() @click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls, help='ResultWorker class to be used.') @click.pass_context def result_worker(ctx, result_cls, get_object=False): """ Run result worker. """ g = ctx.obj ResultWorker = load_cls(None, None, result_cls) result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result) g.instances.append(result_worker) if g.get('testing_mode') or get_object: return result_worker result_worker.run() @cli.command() @click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST', help='webui bind to host') @click.option('--port', default=5000, envvar='WEBUI_PORT', help='webui bind to host') @click.option('--cdn', default='//cdnjs.cloudflare.com/ajax/libs/', help='js/css cdn server') @click.option('--scheduler-rpc', help='xmlrpc path of scheduler') @click.option('--fetcher-rpc', help='xmlrpc path of fetcher') @click.option('--max-rate', type=float, help='max rate for each project') @click.option('--max-burst', type=float, help='max burst for each project') @click.option('--username', envvar='WEBUI_USERNAME', help='username of lock -ed projects') @click.option('--password', envvar='WEBUI_PASSWORD', help='password of lock -ed projects') @click.option('--need-auth', is_flag=True, default=False, help='need username and password') @click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls, help='webui Flask Application instance to be used.') @click.option('--process-time-limit', default=30, help='script process time limit in debug') @click.pass_context def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst, username, password, need_auth, webui_instance, process_time_limit, get_object=False): """ Run WebUI """ app = load_cls(None, None, webui_instance) g = ctx.obj app.config['taskdb'] = g.taskdb app.config['projectdb'] = g.projectdb app.config['resultdb'] = g.resultdb app.config['cdn'] = cdn if max_rate: app.config['max_rate'] = max_rate if max_burst: app.config['max_burst'] = max_burst if username: app.config['webui_username'] = username if password: app.config['webui_password'] = password app.config['need_auth'] = need_auth app.config['process_time_limit'] = process_time_limit # inject queues for webui for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher', 'fetcher2processor', 'processor2result'): app.config['queues'][name] = getattr(g, name, None) # fetcher rpc if isinstance(fetcher_rpc, six.string_types): import umsgpack fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc) app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data) else: # get fetcher instance for webui fetcher_config = g.config.get('fetcher', {}) webui_fetcher = ctx.invoke(fetcher, async_mode=False, get_object=True, no_input=True, **fetcher_config) app.config['fetch'] = lambda x: webui_fetcher.fetch(x) if isinstance(scheduler_rpc, six.string_types): scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc) if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'): app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % ( os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):])) elif scheduler_rpc is None: app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/') else: app.config['scheduler_rpc'] = scheduler_rpc app.debug = g.debug g.instances.append(app) if g.get('testing_mode') or get_object: return app app.run(host=host, port=port) @cli.command() @click.option('--phantomjs-path', default='phantomjs', help='phantomjs path') @click.option('--port', default=25555, help='phantomjs port') @click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed') @click.argument('args', nargs=-1) @click.pass_context def phantomjs(ctx, phantomjs_path, port, auto_restart, args): """ Run phantomjs fetcher if phantomjs is installed. """ args = args or ctx.default_map and ctx.default_map.get('args', []) import subprocess g = ctx.obj _quit = [] phantomjs_fetcher = os.path.join( os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js') cmd = [phantomjs_path, # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903 #'--load-images=false', '--ssl-protocol=any', '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)] try: _phantomjs = subprocess.Popen(cmd) except OSError: logging.warning('phantomjs not found, continue running without it.') return None def quit(*args, **kwargs): _quit.append(1) _phantomjs.kill() _phantomjs.wait() logging.info('phantomjs exited.') if not g.get('phantomjs_proxy'): g['phantomjs_proxy'] = '127.0.0.1:%s' % port phantomjs = utils.ObjectDict(port=port, quit=quit) g.instances.append(phantomjs) if g.get('testing_mode'): return phantomjs while True: _phantomjs.wait() if _quit or not auto_restart: break _phantomjs = subprocess.Popen(cmd) @cli.command() @click.option('--fetcher-num', default=1, help='instance num of fetcher') @click.option('--processor-num', default=1, help='instance num of processor') @click.option('--result-worker-num', default=1, help='instance num of result worker') @click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']), help='run each components in thread or subprocess. ' 'always using thread for windows.') @click.pass_context def all(ctx, fetcher_num, processor_num, result_worker_num, run_in): """ Run all the components in subprocess or thread """ ctx.obj['debug'] = False g = ctx.obj # FIXME: py34 cannot run components with threads if run_in == 'subprocess' and os.name != 'nt': run_in = utils.run_in_subprocess else: run_in = utils.run_in_thread threads = [] try: # phantomjs if not g.get('phantomjs_proxy'): phantomjs_config = g.config.get('phantomjs', {}) phantomjs_config.setdefault('auto_restart', True) threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config)) time.sleep(2) if threads[-1].is_alive() and not g.get('phantomjs_proxy'): g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555) # result worker result_worker_config = g.config.get('result_worker', {}) for i in range(result_worker_num): threads.append(run_in(ctx.invoke, result_worker, **result_worker_config)) # processor processor_config = g.config.get('processor', {}) for i in range(processor_num): threads.append(run_in(ctx.invoke, processor, **processor_config)) # fetcher fetcher_config = g.config.get('fetcher', {}) fetcher_config.setdefault('xmlrpc_host', '127.0.0.1') for i in range(fetcher_num): threads.append(run_in(ctx.invoke, fetcher, **fetcher_config)) # scheduler scheduler_config = g.config.get('scheduler', {}) scheduler_config.setdefault('xmlrpc_host', '127.0.0.1') threads.append(run_in(ctx.invoke, scheduler, **scheduler_config)) # running webui in main thread to make it exitable webui_config = g.config.get('webui', {}) webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/' % g.config.get('scheduler', {}).get('xmlrpc_port', 23333)) ctx.invoke(webui, **webui_config) finally: # exit components run in threading for each in g.instances: each.quit() # exit components run in subprocess for each in threads: if not each.is_alive(): continue if hasattr(each, 'terminate'): each.terminate() each.join() @cli.command() @click.option('--fetcher-num', default=1, help='instance num of fetcher') @click.option('--processor-num', default=2, help='instance num of processor') @click.option('--result-worker-num', default=1, help='instance num of result worker') @click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']), help='run each components in thread or subprocess. ' 'always using thread for windows.') @click.option('--total', default=10000, help="total url in test page") @click.option('--show', default=20, help="show how many urls in a page") @click.option('--taskdb-bench', default=False, is_flag=True, help="only run taskdb bench test") @click.option('--message-queue-bench', default=False, is_flag=True, help="only run message queue bench test") @click.option('--all-bench', default=False, is_flag=True, help="only run all bench test") @click.pass_context def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show, taskdb_bench, message_queue_bench, all_bench): """ Run Benchmark test. In bench mode, in-memory sqlite database is used instead of on-disk sqlite database. """ from pyspider.libs import bench from pyspider.webui import bench_test # flake8: noqa ctx.obj['debug'] = False g = ctx.obj if result_worker_num == 0: g['processor2result'] = None if run_in == 'subprocess' and os.name != 'nt': run_in = utils.run_in_subprocess else: run_in = utils.run_in_thread all_test = not taskdb_bench and not message_queue_bench and not all_bench # test taskdb if all_test or taskdb_bench: bench.bench_test_taskdb(g.taskdb) # test message queue if all_test or message_queue_bench: bench.bench_test_message_queue(g.scheduler2fetcher) # test all if not all_test and not all_bench: return project_name = 'bench' def clear_project(): g.taskdb.drop(project_name) g.resultdb.drop(project_name) clear_project() # disable log logging.getLogger().setLevel(logging.ERROR) logging.getLogger('scheduler').setLevel(logging.ERROR) logging.getLogger('fetcher').setLevel(logging.ERROR) logging.getLogger('processor').setLevel(logging.ERROR) logging.getLogger('result').setLevel(logging.ERROR) logging.getLogger('webui').setLevel(logging.ERROR) logging.getLogger('werkzeug').setLevel(logging.ERROR) try: threads = [] # result worker result_worker_config = g.config.get('result_worker', {}) for i in range(result_worker_num): threads.append(run_in(ctx.invoke, result_worker, result_cls='pyspider.libs.bench.BenchResultWorker', **result_worker_config)) # processor processor_config = g.config.get('processor', {}) for i in range(processor_num): threads.append(run_in(ctx.invoke, processor, processor_cls='pyspider.libs.bench.BenchProcessor', **processor_config)) # fetcher fetcher_config = g.config.get('fetcher', {}) fetcher_config.setdefault('xmlrpc_host', '127.0.0.1') for i in range(fetcher_num): threads.append(run_in(ctx.invoke, fetcher, fetcher_cls='pyspider.libs.bench.BenchFetcher', **fetcher_config)) # webui webui_config = g.config.get('webui', {}) webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/' % g.config.get('scheduler', {}).get('xmlrpc_port', 23333)) threads.append(run_in(ctx.invoke, webui, **webui_config)) # scheduler scheduler_config = g.config.get('scheduler', {}) scheduler_config.setdefault('xmlrpc_host', '127.0.0.1') scheduler_config.setdefault('xmlrpc_port', 23333) threads.append(run_in(ctx.invoke, scheduler, scheduler_cls='pyspider.libs.bench.BenchScheduler', **scheduler_config)) scheduler_rpc = connect_rpc(ctx, None, 'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config) for _ in range(20): if utils.check_port_open(23333): break time.sleep(1) scheduler_rpc.newtask({ "project": project_name, "taskid": "on_start", "url": "data:,on_start", "fetch": { "save": {"total": total, "show": show} }, "process": { "callback": "on_start", }, }) # wait bench test finished while True: time.sleep(1) if scheduler_rpc.size() == 0: break finally: # exit components run in threading for each in g.instances: each.quit() # exit components run in subprocess for each in threads: if hasattr(each, 'terminate'): each.terminate() each.join(1) clear_project() @cli.command() @click.option('-i', '--interactive', default=False, is_flag=True, help='enable interactive mode, you can choose crawl url.') @click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True, help='enable phantomjs, will spawn a subprocess for phantomjs') @click.argument('scripts', nargs=-1) @click.pass_context def one(ctx, interactive, enable_phantomjs, scripts): """ One mode not only means all-in-one, it runs every thing in one process over tornado.ioloop, for debug purpose """ ctx.obj['debug'] = False g = ctx.obj g['testing_mode'] = True if scripts: from pyspider.database.local.projectdb import ProjectDB g['projectdb'] = ProjectDB(scripts) if g.get('is_taskdb_default'): g['taskdb'] = connect_database('sqlite+taskdb://') if g.get('is_resultdb_default'): g['resultdb'] = None if enable_phantomjs: phantomjs_config = g.config.get('phantomjs', {}) phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config) if phantomjs_obj: g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port) else: phantomjs_obj = None result_worker_config = g.config.get('result_worker', {}) if g.resultdb is None: result_worker_config.setdefault('result_cls', 'pyspider.result.OneResultWorker') result_worker_obj = ctx.invoke(result_worker, **result_worker_config) processor_config = g.config.get('processor', {}) processor_config.setdefault('enable_stdout_capture', False) processor_obj = ctx.invoke(processor, **processor_config) fetcher_config = g.config.get('fetcher', {}) fetcher_config.setdefault('xmlrpc', False) fetcher_obj = ctx.invoke(fetcher, **fetcher_config) scheduler_config = g.config.get('scheduler', {}) scheduler_config.setdefault('xmlrpc', False) scheduler_config.setdefault('scheduler_cls', 'pyspider.scheduler.OneScheduler') scheduler_obj = ctx.invoke(scheduler, **scheduler_config) scheduler_obj.init_one(ioloop=fetcher_obj.ioloop, fetcher=fetcher_obj, processor=processor_obj, result_worker=result_worker_obj, interactive=interactive) if scripts: for project in g.projectdb.projects: scheduler_obj.trigger_on_start(project) try: scheduler_obj.run() finally: scheduler_obj.quit() if phantomjs_obj: phantomjs_obj.quit() @cli.command() @click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler') @click.argument('project', nargs=1) @click.argument('message', nargs=1) @click.pass_context def send_message(ctx, scheduler_rpc, project, message): """ Send Message to project from command line """ if isinstance(scheduler_rpc, six.string_types): scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc) if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'): scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % ( os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):])) if scheduler_rpc is None: scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/') return scheduler_rpc.send_task({ 'taskid': utils.md5string('data:,on_message'), 'project': project, 'url': 'data:,on_message', 'fetch': { 'save': ('__command__', message), }, 'process': { 'callback': '_on_message', } }) def main(): cli() if __name__ == '__main__': main()
- 2.fetcher ornado_fetcher.py(注意:替换这个文件稍微麻烦一点,注意只替换里面的变量,不替换包里带有async文字的东西)
-
#!/usr/bin/env python # -*- encoding: utf-8 -*- # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: # Author: Binux<i@binux.me> # http://binux.me # Created on 2012-12-17 11:07:19 from __future__ import unicode_literals import os import sys import six import copy import time import json import logging import traceback import functools import threading import tornado.ioloop import tornado.httputil import tornado.httpclient import pyspider from six.moves import queue, http_cookies from six.moves.urllib.robotparser import RobotFileParser from requests import cookies from six.moves.urllib.parse import urljoin, urlsplit from tornado import gen from tornado.curl_httpclient import CurlAsyncHTTPClient from tornado.simple_httpclient import SimpleAsyncHTTPClient from pyspider.libs import utils, dataurl, counter from pyspider.libs.url import quote_chinese from .cookie_utils import extract_cookies_to_jar logger = logging.getLogger('fetcher') class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient): def free_size(self): return len(self._free_list) def size(self): return len(self._curls) - self.free_size() class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient): def free_size(self): return self.max_clients - self.size() def size(self): return len(self.active) fetcher_output = { "status_code": int, "orig_url": str, "url": str, "headers": dict, "content": str, "cookies": dict, } class Fetcher(object): user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__ default_options = { 'method': 'GET', 'headers': { }, 'use_gzip': True, 'timeout': 120, 'connect_timeout': 20, } phantomjs_proxy = None splash_endpoint = None splash_lua_source = open(os.path.join(os.path.dirname(__file__), "splash_fetcher.lua")).read() robot_txt_age = 60*60 # 1h def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async_mode=True): self.inqueue = inqueue self.outqueue = outqueue self.poolsize = poolsize self._running = False self._quit = False self.proxy = proxy self.async_mode = async_mode self.ioloop = tornado.ioloop.IOLoop() self.robots_txt_cache = {} # binding io_loop to http_client here if self.async_mode: self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize, io_loop=self.ioloop) else: self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize) self._cnt = { '5m': counter.CounterManager( lambda: counter.TimebaseAverageWindowCounter(30, 10)), '1h': counter.CounterManager( lambda: counter.TimebaseAverageWindowCounter(60, 60)), } def send_result(self, type, task, result): '''Send fetch result to processor''' if self.outqueue: try: self.outqueue.put((task, result)) except Exception as e: logger.exception(e) def fetch(self, task, callback=None): if self.async_mode: return self.async_mode_fetch(task, callback) else: return self.async_mode_fetch(task, callback).result() @gen.coroutine def async_mode_fetch(self, task, callback=None): '''Do one fetch''' url = task.get('url', 'data:,') if callback is None: callback = self.send_result type = 'None' start_time = time.time() try: if url.startswith('data:'): type = 'data' result = yield gen.maybe_future(self.data_fetch(url, task)) elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'): type = 'phantomjs' result = yield self.phantomjs_fetch(url, task) elif task.get('fetch', {}).get('fetch_type') in ('splash', ): type = 'splash' result = yield self.splash_fetch(url, task) else: type = 'http' result = yield self.http_fetch(url, task) except Exception as e: logger.exception(e) result = self.handle_error(type, url, task, start_time, e) callback(type, task, result) self.on_result(type, task, result) raise gen.Return(result) def sync_fetch(self, task): '''Synchronization fetch, usually used in xmlrpc thread''' if not self._running: return self.ioloop.run_sync(functools.partial(self.async_mode_fetch, task, lambda t, _, r: True)) wait_result = threading.Condition() _result = {} def callback(type, task, result): wait_result.acquire() _result['type'] = type _result['task'] = task _result['result'] = result wait_result.notify() wait_result.release() wait_result.acquire() self.ioloop.add_callback(self.fetch, task, callback) while 'result' not in _result: wait_result.wait() wait_result.release() return _result['result'] def data_fetch(self, url, task): '''A fake fetcher for dataurl''' self.on_fetch('data', task) result = {} result['orig_url'] = url result['content'] = dataurl.decode(url) result['headers'] = {} result['status_code'] = 200 result['url'] = url result['cookies'] = {} result['time'] = 0 result['save'] = task.get('fetch', {}).get('save') if len(result['content']) < 70: logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url) else: logger.info( "[200] %s:%s data:,%s...[content:%d] 0s", task.get('project'), task.get('taskid'), result['content'][:70], len(result['content']) ) return result def handle_error(self, type, url, task, start_time, error): result = { 'status_code': getattr(error, 'code', 599), 'error': utils.text(error), 'traceback': traceback.format_exc() if sys.exc_info()[0] else None, 'content': "", 'time': time.time() - start_time, 'orig_url': url, 'url': url, "save": task.get('fetch', {}).get('save') } logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'], task.get('project'), task.get('taskid'), url, error, result['time']) return result allowed_options = ['method', 'data', 'connect_timeout', 'timeout', 'cookies', 'use_gzip', 'validate_cert'] def pack_tornado_request_parameters(self, url, task): fetch = copy.deepcopy(self.default_options) fetch['url'] = url fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers']) fetch['headers']['User-Agent'] = self.user_agent task_fetch = task.get('fetch', {}) for each in self.allowed_options: if each in task_fetch: fetch[each] = task_fetch[each] fetch['headers'].update(task_fetch.get('headers', {})) if task.get('track'): track_headers = tornado.httputil.HTTPHeaders( task.get('track', {}).get('fetch', {}).get('headers') or {}) track_ok = task.get('track', {}).get('process', {}).get('ok', False) else: track_headers = {} track_ok = False # proxy proxy_string = None if isinstance(task_fetch.get('proxy'), six.string_types): proxy_string = task_fetch['proxy'] elif self.proxy and task_fetch.get('proxy', True): proxy_string = self.proxy if proxy_string: if '://' not in proxy_string: proxy_string = 'http://' + proxy_string proxy_splited = urlsplit(proxy_string) fetch['proxy_host'] = proxy_splited.hostname if proxy_splited.username: fetch['proxy_username'] = proxy_splited.username if proxy_splited.password: fetch['proxy_password'] = proxy_splited.password if six.PY2: for key in ('proxy_host', 'proxy_username', 'proxy_password'): if key in fetch: fetch[key] = fetch[key].encode('utf8') fetch['proxy_port'] = proxy_splited.port or 8080 # etag if task_fetch.get('etag', True): _t = None if isinstance(task_fetch.get('etag'), six.string_types): _t = task_fetch.get('etag') elif track_ok: _t = track_headers.get('etag') if _t and 'If-None-Match' not in fetch['headers']: fetch['headers']['If-None-Match'] = _t # last modifed if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)): last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True)) _t = None if isinstance(last_modified, six.string_types): _t = last_modified elif track_ok: _t = track_headers.get('last-modified') if _t and 'If-Modified-Since' not in fetch['headers']: fetch['headers']['If-Modified-Since'] = _t # timeout if 'timeout' in fetch: fetch['request_timeout'] = fetch['timeout'] del fetch['timeout'] # data rename to body if 'data' in fetch: fetch['body'] = fetch['data'] del fetch['data'] return fetch @gen.coroutine def can_fetch(self, user_agent, url): parsed = urlsplit(url) domain = parsed.netloc if domain in self.robots_txt_cache: robot_txt = self.robots_txt_cache[domain] if time.time() - robot_txt.mtime() > self.robot_txt_age: robot_txt = None else: robot_txt = None if robot_txt is None: robot_txt = RobotFileParser() try: response = yield gen.maybe_future(self.http_client.fetch( urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30)) content = response.body except tornado.httpclient.HTTPError as e: logger.error('load robots.txt from %s error: %r', domain, e) content = '' try: content = content.decode('utf8', 'ignore') except UnicodeDecodeError: content = '' robot_txt.parse(content.splitlines()) self.robots_txt_cache[domain] = robot_txt raise gen.Return(robot_txt.can_fetch(user_agent, url)) def clear_robot_txt_cache(self): now = time.time() for domain, robot_txt in self.robots_txt_cache.items(): if now - robot_txt.mtime() > self.robot_txt_age: del self.robots_txt_cache[domain] @gen.coroutine def http_fetch(self, url, task): '''HTTP fetcher''' start_time = time.time() self.on_fetch('http', task) handle_error = lambda x: self.handle_error('http', url, task, start_time, x) # setup request parameters fetch = self.pack_tornado_request_parameters(url, task) task_fetch = task.get('fetch', {}) session = cookies.RequestsCookieJar() # fix for tornado request obj if 'Cookie' in fetch['headers']: c = http_cookies.SimpleCookie() try: c.load(fetch['headers']['Cookie']) except AttributeError: c.load(utils.utf8(fetch['headers']['Cookie'])) for key in c: session.set(key, c[key]) del fetch['headers']['Cookie'] if 'cookies' in fetch: session.update(fetch['cookies']) del fetch['cookies'] max_redirects = task_fetch.get('max_redirects', 5) # we will handle redirects by hand to capture cookies fetch['follow_redirects'] = False # making requests while True: # robots.txt if task_fetch.get('robots_txt', False): can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url']) if not can_fetch: error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt') raise gen.Return(handle_error(error)) try: request = tornado.httpclient.HTTPRequest(**fetch) # if cookie already in header, get_cookie_header wouldn't work old_cookie_header = request.headers.get('Cookie') if old_cookie_header: del request.headers['Cookie'] cookie_header = cookies.get_cookie_header(session, request) if cookie_header: request.headers['Cookie'] = cookie_header elif old_cookie_header: request.headers['Cookie'] = old_cookie_header except Exception as e: logger.exception(fetch) raise gen.Return(handle_error(e)) try: response = yield gen.maybe_future(self.http_client.fetch(request)) except tornado.httpclient.HTTPError as e: if e.response: response = e.response else: raise gen.Return(handle_error(e)) extract_cookies_to_jar(session, response.request, response.headers) if (response.code in (301, 302, 303, 307) and response.headers.get('Location') and task_fetch.get('allow_redirects', True)): if max_redirects <= 0: error = tornado.httpclient.HTTPError( 599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5), response) raise gen.Return(handle_error(error)) if response.code in (302, 303): fetch['method'] = 'GET' if 'body' in fetch: del fetch['body'] fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location'])) fetch['request_timeout'] -= time.time() - start_time if fetch['request_timeout'] < 0: fetch['request_timeout'] = 0.1 max_redirects -= 1 continue result = {} result['orig_url'] = url result['content'] = response.body or '' result['headers'] = dict(response.headers) result['status_code'] = response.code result['url'] = response.effective_url or url result['time'] = time.time() - start_time result['cookies'] = session.get_dict() result['save'] = task_fetch.get('save') if response.error: result['error'] = utils.text(response.error) if 200 <= response.code < 300: logger.info("[%d] %s:%s %s %.2fs", response.code, task.get('project'), task.get('taskid'), url, result['time']) else: logger.warning("[%d] %s:%s %s %.2fs", response.code, task.get('project'), task.get('taskid'), url, result['time']) raise gen.Return(result) @gen.coroutine def phantomjs_fetch(self, url, task): '''Fetch with phantomjs proxy''' start_time = time.time() self.on_fetch('phantomjs', task) handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x) # check phantomjs proxy is enabled if not self.phantomjs_proxy: result = { "orig_url": url, "content": "phantomjs is not enabled.", "headers": {}, "status_code": 501, "url": url, "time": time.time() - start_time, "cookies": {}, "save": task.get('fetch', {}).get('save') } logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url) raise gen.Return(result) # setup request parameters fetch = self.pack_tornado_request_parameters(url, task) task_fetch = task.get('fetch', {}) for each in task_fetch: if each not in fetch: fetch[each] = task_fetch[each] # robots.txt if task_fetch.get('robots_txt', False): user_agent = fetch['headers']['User-Agent'] can_fetch = yield self.can_fetch(user_agent, url) if not can_fetch: error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt') raise gen.Return(handle_error(error)) request_conf = { 'follow_redirects': False } request_conf['connect_timeout'] = fetch.get('connect_timeout', 20) request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1 session = cookies.RequestsCookieJar() if 'Cookie' in fetch['headers']: c = http_cookies.SimpleCookie() try: c.load(fetch['headers']['Cookie']) except AttributeError: c.load(utils.utf8(fetch['headers']['Cookie'])) for key in c: session.set(key, c[key]) del fetch['headers']['Cookie'] if 'cookies' in fetch: session.update(fetch['cookies']) del fetch['cookies'] request = tornado.httpclient.HTTPRequest(url=fetch['url']) cookie_header = cookies.get_cookie_header(session, request) if cookie_header: fetch['headers']['Cookie'] = cookie_header # making requests fetch['headers'] = dict(fetch['headers']) try: request = tornado.httpclient.HTTPRequest( url=self.phantomjs_proxy, method="POST", body=json.dumps(fetch), **request_conf) except Exception as e: raise gen.Return(handle_error(e)) try: response = yield gen.maybe_future(self.http_client.fetch(request)) except tornado.httpclient.HTTPError as e: if e.response: response = e.response else: raise gen.Return(handle_error(e)) if not response.body: raise gen.Return(handle_error(Exception('no response from phantomjs: %r' % response))) result = {} try: result = json.loads(utils.text(response.body)) assert 'status_code' in result, result except Exception as e: if response.error: result['error'] = utils.text(response.error) raise gen.Return(handle_error(e)) if result.get('status_code', 200): logger.info("[%d] %s:%s %s %.2fs", result['status_code'], task.get('project'), task.get('taskid'), url, result['time']) else: logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'], task.get('project'), task.get('taskid'), url, result['content'], result['time']) raise gen.Return(result) @gen.coroutine def splash_fetch(self, url, task): '''Fetch with splash''' start_time = time.time() self.on_fetch('splash', task) handle_error = lambda x: self.handle_error('splash', url, task, start_time, x) # check phantomjs proxy is enabled if not self.splash_endpoint: result = { "orig_url": url, "content": "splash is not enabled.", "headers": {}, "status_code": 501, "url": url, "time": time.time() - start_time, "cookies": {}, "save": task.get('fetch', {}).get('save') } logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url) raise gen.Return(result) # setup request parameters fetch = self.pack_tornado_request_parameters(url, task) task_fetch = task.get('fetch', {}) for each in task_fetch: if each not in fetch: fetch[each] = task_fetch[each] # robots.txt if task_fetch.get('robots_txt', False): user_agent = fetch['headers']['User-Agent'] can_fetch = yield self.can_fetch(user_agent, url) if not can_fetch: error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt') raise gen.Return(handle_error(error)) request_conf = { 'follow_redirects': False, 'headers': { 'Content-Type': 'application/json', } } request_conf['connect_timeout'] = fetch.get('connect_timeout', 20) request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1 session = cookies.RequestsCookieJar() if 'Cookie' in fetch['headers']: c = http_cookies.SimpleCookie() try: c.load(fetch['headers']['Cookie']) except AttributeError: c.load(utils.utf8(fetch['headers']['Cookie'])) for key in c: session.set(key, c[key]) del fetch['headers']['Cookie'] if 'cookies' in fetch: session.update(fetch['cookies']) del fetch['cookies'] request = tornado.httpclient.HTTPRequest(url=fetch['url']) cookie_header = cookies.get_cookie_header(session, request) if cookie_header: fetch['headers']['Cookie'] = cookie_header # making requests fetch['lua_source'] = self.splash_lua_source fetch['headers'] = dict(fetch['headers']) try: request = tornado.httpclient.HTTPRequest( url=self.splash_endpoint, method="POST", body=json.dumps(fetch), **request_conf) except Exception as e: raise gen.Return(handle_error(e)) try: response = yield gen.maybe_future(self.http_client.fetch(request)) except tornado.httpclient.HTTPError as e: if e.response: response = e.response else: raise gen.Return(handle_error(e)) if not response.body: raise gen.Return(handle_error(Exception('no response from phantomjs'))) result = {} try: result = json.loads(utils.text(response.body)) assert 'status_code' in result, result except ValueError as e: logger.error("result is not json: %r", response.body[:500]) raise gen.Return(handle_error(e)) except Exception as e: if response.error: result['error'] = utils.text(response.error) raise gen.Return(handle_error(e)) if result.get('status_code', 200): logger.info("[%d] %s:%s %s %.2fs", result['status_code'], task.get('project'), task.get('taskid'), url, result['time']) else: logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'], task.get('project'), task.get('taskid'), url, result['content'], result['time']) raise gen.Return(result) def run(self): '''Run loop''' logger.info("fetcher starting...") def queue_loop(): if not self.outqueue or not self.inqueue: return while not self._quit: try: if self.outqueue.full(): break if self.http_client.free_size() <= 0: break task = self.inqueue.get_nowait() # FIXME: decode unicode_obj should used after data selete from # database, it's used here for performance task = utils.decode_unicode_obj(task) self.fetch(task) except queue.Empty: break except KeyboardInterrupt: break except Exception as e: logger.exception(e) break tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start() tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start() self._running = True try: self.ioloop.start() except KeyboardInterrupt: pass logger.info("fetcher exiting...") def quit(self): '''Quit fetcher''' self._running = False self._quit = True self.ioloop.add_callback(self.ioloop.stop) if hasattr(self, 'xmlrpc_server'): self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop) self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop) def size(self): return self.http_client.size() def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False): '''Run xmlrpc server''' import umsgpack from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication try: from xmlrpc.client import Binary except ImportError: from xmlrpclib import Binary application = WSGIXMLRPCApplication() application.register_function(self.quit, '_quit') application.register_function(self.size) def sync_fetch(task): result = self.sync_fetch(task) result = Binary(umsgpack.packb(result)) return result application.register_function(sync_fetch, 'fetch') def dump_counter(_time, _type): return self._cnt[_time].to_dict(_type) application.register_function(dump_counter, 'counter') import tornado.wsgi import tornado.ioloop import tornado.httpserver container = tornado.wsgi.WSGIContainer(application) self.xmlrpc_ioloop = tornado.ioloop.IOLoop() self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop) self.xmlrpc_server.listen(port=port, address=bind) logger.info('fetcher.xmlrpc listening on %s:%s', bind, port) self.xmlrpc_ioloop.start() def on_fetch(self, type, task): '''Called before task fetch''' pass def on_result(self, type, task, result): '''Called after task fetched''' status_code = result.get('status_code', 599) if status_code != 599: status_code = (int(status_code) / 100 * 100) self._cnt['5m'].event((task.get('project'), status_code), +1) self._cnt['1h'].event((task.get('project'), status_code), +1) if type in ('http', 'phantomjs') and result.get('time'): content_len = len(result.get('content', '')) self._cnt['5m'].event((task.get('project'), 'speed'), float(content_len) / result.get('time')) self._cnt['1h'].event((task.get('project'), 'speed'), float(content_len) / result.get('time')) self._cnt['5m'].event((task.get('project'), 'time'), result.get('time')) self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
-
- 3.webuiapp.py(直接全部替换,async为async_mode)
-
#!/usr/bin/env python # -*- encoding: utf-8 -*- # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: # Author: Binux<i@binux.me> # http://binux.me # Created on 2014-02-22 23:17:13 import os import sys import logging logger = logging.getLogger("webui") from six import reraise from six.moves import builtins from six.moves.urllib.parse import urljoin from flask import Flask from pyspider.fetcher import tornado_fetcher if os.name == 'nt': import mimetypes mimetypes.add_type("text/css", ".css", True) class QuitableFlask(Flask): """Add quit() method to Flask object""" @property def logger(self): return logger def run(self, host=None, port=None, debug=None, **options): import tornado.wsgi import tornado.ioloop import tornado.httpserver import tornado.web if host is None: host = '127.0.0.1' if port is None: server_name = self.config['SERVER_NAME'] if server_name and ':' in server_name: port = int(server_name.rsplit(':', 1)[1]) else: port = 5000 if debug is not None: self.debug = bool(debug) hostname = host port = port application = self use_reloader = self.debug use_debugger = self.debug if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, True) try: from .webdav import dav_app except ImportError as e: logger.warning('WebDav interface not enabled: %r', e) dav_app = None if dav_app: from werkzeug.wsgi import DispatcherMiddleware application = DispatcherMiddleware(application, { '/dav': dav_app }) container = tornado.wsgi.WSGIContainer(application) self.http_server = tornado.httpserver.HTTPServer(container) self.http_server.listen(port, hostname) if use_reloader: from tornado import autoreload autoreload.start() self.logger.info('webui running on %s:%s', hostname, port) self.ioloop = tornado.ioloop.IOLoop.current() self.ioloop.start() def quit(self): if hasattr(self, 'ioloop'): self.ioloop.add_callback(self.http_server.stop) self.ioloop.add_callback(self.ioloop.stop) self.logger.info('webui exiting...') app = QuitableFlask('webui', static_folder=os.path.join(os.path.dirname(__file__), 'static'), template_folder=os.path.join(os.path.dirname(__file__), 'templates')) app.secret_key = os.urandom(24) app.jinja_env.line_statement_prefix = '#' app.jinja_env.globals.update(builtins.__dict__) app.config.update({ 'fetch': lambda x: tornado_fetcher.Fetcher(None, None, async_mode=False).fetch(x), 'taskdb': None, 'projectdb': None, 'scheduler_rpc': None, 'queues': dict(), 'process_time_limit': 30, }) def cdn_url_handler(error, endpoint, kwargs): if endpoint == 'cdn': path = kwargs.pop('path') # cdn = app.config.get('cdn', 'http://cdn.staticfile.org/') # cdn = app.config.get('cdn', '//cdnjs.cloudflare.com/ajax/libs/') cdn = app.config.get('cdn', '//cdnjscn.b0.upaiyun.com/libs/') return urljoin(cdn, path) else: exc_type, exc_value, tb = sys.exc_info() if exc_value is error: reraise(exc_type, exc_value, tb) else: raise error app.handle_url_build_error = cdn_url_handler
第二步:修改webdav.py第209行
如果报错,ValueError: Invalid configuration: - Deprecated option 'domaincontroller': use 'http_authenticator
- 原因是因为WsgiDAV发布了版本 pre-release 3.x。
- 解决办法
- 在安装包中找到pyspider的资源包,然后找到webui文件里面的webdav.py文件打开,修改第209行即可。
把
'domaincontroller': NeedAuthController(app),
修改为:
'http_authenticator':{ 'HTTPAuthenticator':NeedAuthController(app), },
第三步:降低WsgiDAV版本
python -m pip install wsgidav==2.4.1
然后再执行pyspider all就能够通过http://localhost:5000打开页面了。