zoukankan      html  css  js  c++  java
  • Pyspider all 出现的坑,针对python3.7版本

    执行时,报出问题:SyntaxError: invalid syntax

    第一步:改变量名

    • async从 python3.7 开始已经加入保留关键字中,所以async不能作为函数的参数名.
    • 修改/Library/anaconda3/lib/python3.7/site-packages/pyspider路径下的python文件中的async为async_mode(其他名也可以)替换好的代码在下方。
      • 1.run.py (直接全部替换,async为async_mode)
      • #!/usr/bin/env python
        # -*- encoding: utf-8 -*-
        # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
        # Author: Binux<i@binux.me>
        #         http://binux.me
        # Created on 2014-03-05 00:11:49
        
        
        import os
        import sys
        import six
        import copy
        import time
        import shutil
        import logging
        import logging.config
        
        import click
        import pyspider
        from pyspider.message_queue import connect_message_queue
        from pyspider.database import connect_database
        from pyspider.libs import utils
        
        
        def read_config(ctx, param, value):
            if not value:
                return {}
            import json
        
            def underline_dict(d):
                if not isinstance(d, dict):
                    return d
                return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
        
            config = underline_dict(json.load(value))
            ctx.default_map = config
            return config
        
        
        def connect_db(ctx, param, value):
            if not value:
                return
            return utils.Get(lambda: connect_database(value))
        
        
        def load_cls(ctx, param, value):
            if isinstance(value, six.string_types):
                return utils.load_object(value)
            return value
        
        
        def connect_rpc(ctx, param, value):
            if not value:
                return
            try:
                from six.moves import xmlrpc_client
            except ImportError:
                import xmlrpclib as xmlrpc_client
            return xmlrpc_client.ServerProxy(value, allow_none=True)
        
        
        @click.group(invoke_without_command=True)
        @click.option('-c', '--config', callback=read_config, type=click.File('r'),
                      help='a json file with default values for subcommands. {"webui": {"port":5001}}')
        @click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
                      help="logging config file for built-in python logging module", show_default=True)
        @click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
        @click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
                      help='maxsize of queue')
        @click.option('--taskdb', envvar='TASKDB', callback=connect_db,
                      help='database url for taskdb, default: sqlite')
        @click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
                      help='database url for projectdb, default: sqlite')
        @click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
                      help='database url for resultdb, default: sqlite')
        @click.option('--message-queue', envvar='AMQP_URL',
                      help='connection url to message queue, '
                      'default: builtin multiprocessing.Queue')
        @click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
                      'please use --message-queue instead.')
        @click.option('--beanstalk', envvar='BEANSTALK_HOST',
                      help='[deprecated] beanstalk config for beanstalk queue. '
                      'please use --message-queue instead.')
        @click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
        @click.option('--data-path', default='./data', help='data dir path')
        @click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
                      help='add current working directory to python lib search path')
        @click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
        @click.pass_context
        def cli(ctx, **kwargs):
            """
            A powerful spider system in python.
            """
            if kwargs['add_sys_path']:
                sys.path.append(os.getcwd())
        
            logging.config.fileConfig(kwargs['logging_config'])
        
            # get db from env
            for db in ('taskdb', 'projectdb', 'resultdb'):
                if kwargs[db] is not None:
                    continue
                if os.environ.get('MYSQL_NAME'):
                    kwargs[db] = utils.Get(lambda db=db: connect_database(
                        'sqlalchemy+mysql+%s://%s:%s/%s' % (
                            db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
                            os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
                elif os.environ.get('MONGODB_NAME'):
                    kwargs[db] = utils.Get(lambda db=db: connect_database(
                        'mongodb+%s://%s:%s/%s' % (
                            db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
                            os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
                elif ctx.invoked_subcommand == 'bench':
                    if kwargs['data_path'] == './data':
                        kwargs['data_path'] += '/bench'
                        shutil.rmtree(kwargs['data_path'], ignore_errors=True)
                        os.mkdir(kwargs['data_path'])
                    if db in ('taskdb', 'resultdb'):
                        kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
                    elif db in ('projectdb', ):
                        kwargs[db] = utils.Get(lambda db=db: connect_database('local+%s://%s' % (
                            db, os.path.join(os.path.dirname(__file__), 'libs/bench.py'))))
                else:
                    if not os.path.exists(kwargs['data_path']):
                        os.mkdir(kwargs['data_path'])
                    kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
                        db, kwargs['data_path'], db[:-2])))
                    kwargs['is_%s_default' % db] = True
        
            # create folder for counter.dump
            if not os.path.exists(kwargs['data_path']):
                os.mkdir(kwargs['data_path'])
        
            # message queue, compatible with old version
            if kwargs.get('message_queue'):
                pass
            elif kwargs.get('amqp_url'):
                kwargs['message_queue'] = kwargs['amqp_url']
            elif os.environ.get('RABBITMQ_NAME'):
                kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
                                           ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
            elif kwargs.get('beanstalk'):
                kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
        
            for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
                         'fetcher2processor', 'processor2result'):
                if kwargs.get('message_queue'):
                    kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
                        name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
                else:
                    kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
                                                         kwargs['queue_maxsize'])
        
            # phantomjs-proxy
            if kwargs.get('phantomjs_proxy'):
                pass
            elif os.environ.get('PHANTOMJS_NAME'):
                kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
        
            ctx.obj = utils.ObjectDict(ctx.obj or {})
            ctx.obj['instances'] = []
            ctx.obj.update(kwargs)
        
            if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
                ctx.invoke(all)
            return ctx
        
        
        @cli.command()
        @click.option('--xmlrpc/--no-xmlrpc', default=True)
        @click.option('--xmlrpc-host', default='0.0.0.0')
        @click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
        @click.option('--inqueue-limit', default=0,
                      help='size limit of task queue for each project, '
                      'tasks will been ignored when overflow')
        @click.option('--delete-time', default=24 * 60 * 60,
                      help='delete time before marked as delete')
        @click.option('--active-tasks', default=100, help='active log size')
        @click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
        @click.option('--fail-pause-num', default=10, help='auto pause the project when last FAIL_PAUSE_NUM task failed, set 0 to disable')
        @click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
                      help='scheduler class to be used.')
        @click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
        @click.pass_context
        def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port,
                      inqueue_limit, delete_time, active_tasks, loop_limit, fail_pause_num,
                      scheduler_cls, threads, get_object=False):
            """
            Run Scheduler, only one scheduler is allowed.
            """
            g = ctx.obj
            Scheduler = load_cls(None, None, scheduler_cls)
        
            kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
                          newtask_queue=g.newtask_queue, status_queue=g.status_queue,
                          out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data'))
            if threads:
                kwargs['threads'] = int(threads)
        
            scheduler = Scheduler(**kwargs)
            scheduler.INQUEUE_LIMIT = inqueue_limit
            scheduler.DELETE_TIME = delete_time
            scheduler.ACTIVE_TASKS = active_tasks
            scheduler.LOOP_LIMIT = loop_limit
            scheduler.FAIL_PAUSE_NUM = fail_pause_num
        
            g.instances.append(scheduler)
            if g.get('testing_mode') or get_object:
                return scheduler
        
            if xmlrpc:
                utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
            scheduler.run()
        
        
        @cli.command()
        @click.option('--xmlrpc/--no-xmlrpc', default=False)
        @click.option('--xmlrpc-host', default='0.0.0.0')
        @click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444)
        @click.option('--poolsize', default=100, help="max simultaneous fetches")
        @click.option('--proxy', help="proxy host:port")
        @click.option('--user-agent', help='user agent')
        @click.option('--timeout', help='default fetch timeout')
        @click.option('--phantomjs-endpoint', help="endpoint of phantomjs, start via pyspider phantomjs")
        @click.option('--splash-endpoint', help="execute endpoint of splash: http://splash.readthedocs.io/en/stable/api.html#execute")
        @click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls,
                      help='Fetcher class to be used.')
        @click.pass_context
        def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent,
                    timeout, phantomjs_endpoint, splash_endpoint, fetcher_cls,
                    async_mode=True, get_object=False, no_input=False):
            """
            Run Fetcher.
            """
            g = ctx.obj
            Fetcher = load_cls(None, None, fetcher_cls)
        
            if no_input:
                inqueue = None
                outqueue = None
            else:
                inqueue = g.scheduler2fetcher
                outqueue = g.fetcher2processor
            fetcher = Fetcher(inqueue=inqueue, outqueue=outqueue,
                              poolsize=poolsize, proxy=proxy, async_mode=async_mode)
            fetcher.phantomjs_proxy = phantomjs_endpoint or g.phantomjs_proxy
            fetcher.splash_endpoint = splash_endpoint
            if user_agent:
                fetcher.user_agent = user_agent
            if timeout:
                fetcher.default_options = copy.deepcopy(fetcher.default_options)
                fetcher.default_options['timeout'] = timeout
        
            g.instances.append(fetcher)
            if g.get('testing_mode') or get_object:
                return fetcher
        
            if xmlrpc:
                utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
            fetcher.run()
        
        
        @cli.command()
        @click.option('--processor-cls', default='pyspider.processor.Processor',
                      callback=load_cls, help='Processor class to be used.')
        @click.option('--process-time-limit', default=30, help='script process time limit')
        @click.pass_context
        def processor(ctx, processor_cls, process_time_limit, enable_stdout_capture=True, get_object=False):
            """
            Run Processor.
            """
            g = ctx.obj
            Processor = load_cls(None, None, processor_cls)
        
            processor = Processor(projectdb=g.projectdb,
                                  inqueue=g.fetcher2processor, status_queue=g.status_queue,
                                  newtask_queue=g.newtask_queue, result_queue=g.processor2result,
                                  enable_stdout_capture=enable_stdout_capture,
                                  process_time_limit=process_time_limit)
        
            g.instances.append(processor)
            if g.get('testing_mode') or get_object:
                return processor
        
            processor.run()
        
        
        @cli.command()
        @click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
                      help='ResultWorker class to be used.')
        @click.pass_context
        def result_worker(ctx, result_cls, get_object=False):
            """
            Run result worker.
            """
            g = ctx.obj
            ResultWorker = load_cls(None, None, result_cls)
        
            result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
        
            g.instances.append(result_worker)
            if g.get('testing_mode') or get_object:
                return result_worker
        
            result_worker.run()
        
        
        @cli.command()
        @click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST',
                      help='webui bind to host')
        @click.option('--port', default=5000, envvar='WEBUI_PORT',
                      help='webui bind to host')
        @click.option('--cdn', default='//cdnjs.cloudflare.com/ajax/libs/',
                      help='js/css cdn server')
        @click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
        @click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
        @click.option('--max-rate', type=float, help='max rate for each project')
        @click.option('--max-burst', type=float, help='max burst for each project')
        @click.option('--username', envvar='WEBUI_USERNAME',
                      help='username of lock -ed projects')
        @click.option('--password', envvar='WEBUI_PASSWORD',
                      help='password of lock -ed projects')
        @click.option('--need-auth', is_flag=True, default=False, help='need username and password')
        @click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls,
                      help='webui Flask Application instance to be used.')
        @click.option('--process-time-limit', default=30, help='script process time limit in debug')
        @click.pass_context
        def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst,
                  username, password, need_auth, webui_instance, process_time_limit, get_object=False):
            """
            Run WebUI
            """
            app = load_cls(None, None, webui_instance)
        
            g = ctx.obj
            app.config['taskdb'] = g.taskdb
            app.config['projectdb'] = g.projectdb
            app.config['resultdb'] = g.resultdb
            app.config['cdn'] = cdn
        
            if max_rate:
                app.config['max_rate'] = max_rate
            if max_burst:
                app.config['max_burst'] = max_burst
            if username:
                app.config['webui_username'] = username
            if password:
                app.config['webui_password'] = password
            app.config['need_auth'] = need_auth
            app.config['process_time_limit'] = process_time_limit
        
            # inject queues for webui
            for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
                         'fetcher2processor', 'processor2result'):
                app.config['queues'][name] = getattr(g, name, None)
        
            # fetcher rpc
            if isinstance(fetcher_rpc, six.string_types):
                import umsgpack
                fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc)
                app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data)
            else:
                # get fetcher instance for webui
                fetcher_config = g.config.get('fetcher', {})
                webui_fetcher = ctx.invoke(fetcher, async_mode=False, get_object=True, no_input=True, **fetcher_config)
        
                app.config['fetch'] = lambda x: webui_fetcher.fetch(x)
        
            if isinstance(scheduler_rpc, six.string_types):
                scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
            if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
                app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % (
                    os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
            elif scheduler_rpc is None:
                app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
            else:
                app.config['scheduler_rpc'] = scheduler_rpc
        
            app.debug = g.debug
            g.instances.append(app)
            if g.get('testing_mode') or get_object:
                return app
        
            app.run(host=host, port=port)
        
        
        @cli.command()
        @click.option('--phantomjs-path', default='phantomjs', help='phantomjs path')
        @click.option('--port', default=25555, help='phantomjs port')
        @click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed')
        @click.argument('args', nargs=-1)
        @click.pass_context
        def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
            """
            Run phantomjs fetcher if phantomjs is installed.
            """
            args = args or ctx.default_map and ctx.default_map.get('args', [])
        
            import subprocess
            g = ctx.obj
            _quit = []
            phantomjs_fetcher = os.path.join(
                os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js')
            cmd = [phantomjs_path,
                   # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
                   #'--load-images=false',
                   '--ssl-protocol=any',
                   '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]
        
            try:
                _phantomjs = subprocess.Popen(cmd)
            except OSError:
                logging.warning('phantomjs not found, continue running without it.')
                return None
        
            def quit(*args, **kwargs):
                _quit.append(1)
                _phantomjs.kill()
                _phantomjs.wait()
                logging.info('phantomjs exited.')
        
            if not g.get('phantomjs_proxy'):
                g['phantomjs_proxy'] = '127.0.0.1:%s' % port
        
            phantomjs = utils.ObjectDict(port=port, quit=quit)
            g.instances.append(phantomjs)
            if g.get('testing_mode'):
                return phantomjs
        
            while True:
                _phantomjs.wait()
                if _quit or not auto_restart:
                    break
                _phantomjs = subprocess.Popen(cmd)
        
        
        @cli.command()
        @click.option('--fetcher-num', default=1, help='instance num of fetcher')
        @click.option('--processor-num', default=1, help='instance num of processor')
        @click.option('--result-worker-num', default=1,
                      help='instance num of result worker')
        @click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
                      help='run each components in thread or subprocess. '
                      'always using thread for windows.')
        @click.pass_context
        def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
            """
            Run all the components in subprocess or thread
            """
        
            ctx.obj['debug'] = False
            g = ctx.obj
        
            # FIXME: py34 cannot run components with threads
            if run_in == 'subprocess' and os.name != 'nt':
                run_in = utils.run_in_subprocess
            else:
                run_in = utils.run_in_thread
        
            threads = []
        
            try:
                # phantomjs
                if not g.get('phantomjs_proxy'):
                    phantomjs_config = g.config.get('phantomjs', {})
                    phantomjs_config.setdefault('auto_restart', True)
                    threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
                    time.sleep(2)
                    if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
                        g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
        
                # result worker
                result_worker_config = g.config.get('result_worker', {})
                for i in range(result_worker_num):
                    threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))
        
                # processor
                processor_config = g.config.get('processor', {})
                for i in range(processor_num):
                    threads.append(run_in(ctx.invoke, processor, **processor_config))
        
                # fetcher
                fetcher_config = g.config.get('fetcher', {})
                fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
                for i in range(fetcher_num):
                    threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
        
                # scheduler
                scheduler_config = g.config.get('scheduler', {})
                scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
                threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))
        
                # running webui in main thread to make it exitable
                webui_config = g.config.get('webui', {})
                webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
                                        % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
                ctx.invoke(webui, **webui_config)
            finally:
                # exit components run in threading
                for each in g.instances:
                    each.quit()
        
                # exit components run in subprocess
                for each in threads:
                    if not each.is_alive():
                        continue
                    if hasattr(each, 'terminate'):
                        each.terminate()
                    each.join()
        
        
        @cli.command()
        @click.option('--fetcher-num', default=1, help='instance num of fetcher')
        @click.option('--processor-num', default=2, help='instance num of processor')
        @click.option('--result-worker-num', default=1, help='instance num of result worker')
        @click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
                      help='run each components in thread or subprocess. '
                      'always using thread for windows.')
        @click.option('--total', default=10000, help="total url in test page")
        @click.option('--show', default=20, help="show how many urls in a page")
        @click.option('--taskdb-bench', default=False, is_flag=True,
                      help="only run taskdb bench test")
        @click.option('--message-queue-bench', default=False, is_flag=True,
                      help="only run message queue bench test")
        @click.option('--all-bench', default=False, is_flag=True,
                      help="only run all bench test")
        @click.pass_context
        def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show,
                  taskdb_bench, message_queue_bench, all_bench):
            """
            Run Benchmark test.
            In bench mode, in-memory sqlite database is used instead of on-disk sqlite database.
            """
            from pyspider.libs import bench
            from pyspider.webui import bench_test  # flake8: noqa
        
            ctx.obj['debug'] = False
            g = ctx.obj
            if result_worker_num == 0:
                g['processor2result'] = None
        
            if run_in == 'subprocess' and os.name != 'nt':
                run_in = utils.run_in_subprocess
            else:
                run_in = utils.run_in_thread
        
            all_test = not taskdb_bench and not message_queue_bench and not all_bench
        
            # test taskdb
            if all_test or taskdb_bench:
                bench.bench_test_taskdb(g.taskdb)
            # test message queue
            if all_test or message_queue_bench:
                bench.bench_test_message_queue(g.scheduler2fetcher)
            # test all
            if not all_test and not all_bench:
                return
        
            project_name = 'bench'
        
            def clear_project():
                g.taskdb.drop(project_name)
                g.resultdb.drop(project_name)
        
            clear_project()
        
            # disable log
            logging.getLogger().setLevel(logging.ERROR)
            logging.getLogger('scheduler').setLevel(logging.ERROR)
            logging.getLogger('fetcher').setLevel(logging.ERROR)
            logging.getLogger('processor').setLevel(logging.ERROR)
            logging.getLogger('result').setLevel(logging.ERROR)
            logging.getLogger('webui').setLevel(logging.ERROR)
            logging.getLogger('werkzeug').setLevel(logging.ERROR)
        
            try:
                threads = []
        
                # result worker
                result_worker_config = g.config.get('result_worker', {})
                for i in range(result_worker_num):
                    threads.append(run_in(ctx.invoke, result_worker,
                                          result_cls='pyspider.libs.bench.BenchResultWorker',
                                          **result_worker_config))
        
                # processor
                processor_config = g.config.get('processor', {})
                for i in range(processor_num):
                    threads.append(run_in(ctx.invoke, processor,
                                          processor_cls='pyspider.libs.bench.BenchProcessor',
                                          **processor_config))
        
                # fetcher
                fetcher_config = g.config.get('fetcher', {})
                fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
                for i in range(fetcher_num):
                    threads.append(run_in(ctx.invoke, fetcher,
                                          fetcher_cls='pyspider.libs.bench.BenchFetcher',
                                          **fetcher_config))
        
                # webui
                webui_config = g.config.get('webui', {})
                webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
                                        % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
                threads.append(run_in(ctx.invoke, webui, **webui_config))
        
                # scheduler
                scheduler_config = g.config.get('scheduler', {})
                scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
                scheduler_config.setdefault('xmlrpc_port', 23333)
                threads.append(run_in(ctx.invoke, scheduler,
                                      scheduler_cls='pyspider.libs.bench.BenchScheduler',
                                      **scheduler_config))
                scheduler_rpc = connect_rpc(ctx, None,
                                            'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config)
        
                for _ in range(20):
                    if utils.check_port_open(23333):
                        break
                    time.sleep(1)
        
                scheduler_rpc.newtask({
                    "project": project_name,
                    "taskid": "on_start",
                    "url": "data:,on_start",
                    "fetch": {
                        "save": {"total": total, "show": show}
                    },
                    "process": {
                        "callback": "on_start",
                    },
                })
        
                # wait bench test finished
                while True:
                    time.sleep(1)
                    if scheduler_rpc.size() == 0:
                        break
            finally:
                # exit components run in threading
                for each in g.instances:
                    each.quit()
        
                # exit components run in subprocess
                for each in threads:
                    if hasattr(each, 'terminate'):
                        each.terminate()
                    each.join(1)
        
                clear_project()
        
        
        @cli.command()
        @click.option('-i', '--interactive', default=False, is_flag=True,
                      help='enable interactive mode, you can choose crawl url.')
        @click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True,
                      help='enable phantomjs, will spawn a subprocess for phantomjs')
        @click.argument('scripts', nargs=-1)
        @click.pass_context
        def one(ctx, interactive, enable_phantomjs, scripts):
            """
            One mode not only means all-in-one, it runs every thing in one process over
            tornado.ioloop, for debug purpose
            """
        
            ctx.obj['debug'] = False
            g = ctx.obj
            g['testing_mode'] = True
        
            if scripts:
                from pyspider.database.local.projectdb import ProjectDB
                g['projectdb'] = ProjectDB(scripts)
                if g.get('is_taskdb_default'):
                    g['taskdb'] = connect_database('sqlite+taskdb://')
                if g.get('is_resultdb_default'):
                    g['resultdb'] = None
        
            if enable_phantomjs:
                phantomjs_config = g.config.get('phantomjs', {})
                phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config)
                if phantomjs_obj:
                    g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port)
            else:
                phantomjs_obj = None
        
            result_worker_config = g.config.get('result_worker', {})
            if g.resultdb is None:
                result_worker_config.setdefault('result_cls',
                                                'pyspider.result.OneResultWorker')
            result_worker_obj = ctx.invoke(result_worker, **result_worker_config)
        
            processor_config = g.config.get('processor', {})
            processor_config.setdefault('enable_stdout_capture', False)
            processor_obj = ctx.invoke(processor, **processor_config)
        
            fetcher_config = g.config.get('fetcher', {})
            fetcher_config.setdefault('xmlrpc', False)
            fetcher_obj = ctx.invoke(fetcher, **fetcher_config)
        
            scheduler_config = g.config.get('scheduler', {})
            scheduler_config.setdefault('xmlrpc', False)
            scheduler_config.setdefault('scheduler_cls',
                                        'pyspider.scheduler.OneScheduler')
            scheduler_obj = ctx.invoke(scheduler, **scheduler_config)
        
            scheduler_obj.init_one(ioloop=fetcher_obj.ioloop,
                                   fetcher=fetcher_obj,
                                   processor=processor_obj,
                                   result_worker=result_worker_obj,
                                   interactive=interactive)
            if scripts:
                for project in g.projectdb.projects:
                    scheduler_obj.trigger_on_start(project)
        
            try:
                scheduler_obj.run()
            finally:
                scheduler_obj.quit()
                if phantomjs_obj:
                    phantomjs_obj.quit()
        
        
        @cli.command()
        @click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
        @click.argument('project', nargs=1)
        @click.argument('message', nargs=1)
        @click.pass_context
        def send_message(ctx, scheduler_rpc, project, message):
            """
            Send Message to project from command line
            """
            if isinstance(scheduler_rpc, six.string_types):
                scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
            if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
                scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
                    os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
            if scheduler_rpc is None:
                scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
        
            return scheduler_rpc.send_task({
                'taskid': utils.md5string('data:,on_message'),
                'project': project,
                'url': 'data:,on_message',
                'fetch': {
                    'save': ('__command__', message),
                },
                'process': {
                    'callback': '_on_message',
                }
            })
        
        
        def main():
            cli()
        
        if __name__ == '__main__':
            main()
        View Code
      • 2.fetcher ornado_fetcher.py(注意:替换这个文件稍微麻烦一点,注意只替换里面的变量,不替换包里带有async文字的东西
      • #!/usr/bin/env python
        # -*- encoding: utf-8 -*-
        # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
        # Author: Binux<i@binux.me>
        #         http://binux.me
        # Created on 2012-12-17 11:07:19
        
        from __future__ import unicode_literals
        
        import os
        import sys
        import six
        import copy
        import time
        import json
        import logging
        import traceback
        import functools
        import threading
        import tornado.ioloop
        import tornado.httputil
        import tornado.httpclient
        import pyspider
        
        from six.moves import queue, http_cookies
        from six.moves.urllib.robotparser import RobotFileParser
        from requests import cookies
        from six.moves.urllib.parse import urljoin, urlsplit
        from tornado import gen
        from tornado.curl_httpclient import CurlAsyncHTTPClient
        from tornado.simple_httpclient import SimpleAsyncHTTPClient
        
        from pyspider.libs import utils, dataurl, counter
        from pyspider.libs.url import quote_chinese
        from .cookie_utils import extract_cookies_to_jar
        logger = logging.getLogger('fetcher')
        
        
        class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
        
            def free_size(self):
                return len(self._free_list)
        
            def size(self):
                return len(self._curls) - self.free_size()
        
        
        class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
        
            def free_size(self):
                return self.max_clients - self.size()
        
            def size(self):
                return len(self.active)
        
        fetcher_output = {
            "status_code": int,
            "orig_url": str,
            "url": str,
            "headers": dict,
            "content": str,
            "cookies": dict,
        }
        
        
        class Fetcher(object):
            user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
            default_options = {
                'method': 'GET',
                'headers': {
                },
                'use_gzip': True,
                'timeout': 120,
                'connect_timeout': 20,
            }
            phantomjs_proxy = None
            splash_endpoint = None
            splash_lua_source = open(os.path.join(os.path.dirname(__file__), "splash_fetcher.lua")).read()
            robot_txt_age = 60*60  # 1h
        
            def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async_mode=True):
                self.inqueue = inqueue
                self.outqueue = outqueue
        
                self.poolsize = poolsize
                self._running = False
                self._quit = False
                self.proxy = proxy
                self.async_mode = async_mode
                self.ioloop = tornado.ioloop.IOLoop()
        
                self.robots_txt_cache = {}
        
                # binding io_loop to http_client here
                if self.async_mode:
                    self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
                                                             io_loop=self.ioloop)
                else:
                    self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
        
                self._cnt = {
                    '5m': counter.CounterManager(
                        lambda: counter.TimebaseAverageWindowCounter(30, 10)),
                    '1h': counter.CounterManager(
                        lambda: counter.TimebaseAverageWindowCounter(60, 60)),
                }
        
            def send_result(self, type, task, result):
                '''Send fetch result to processor'''
                if self.outqueue:
                    try:
                        self.outqueue.put((task, result))
                    except Exception as e:
                        logger.exception(e)
        
            def fetch(self, task, callback=None):
                if self.async_mode:
                    return self.async_mode_fetch(task, callback)
                else:
                    return self.async_mode_fetch(task, callback).result()
        
            @gen.coroutine
            def async_mode_fetch(self, task, callback=None):
                '''Do one fetch'''
                url = task.get('url', 'data:,')
                if callback is None:
                    callback = self.send_result
        
                type = 'None'
                start_time = time.time()
                try:
                    if url.startswith('data:'):
                        type = 'data'
                        result = yield gen.maybe_future(self.data_fetch(url, task))
                    elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
                        type = 'phantomjs'
                        result = yield self.phantomjs_fetch(url, task)
                    elif task.get('fetch', {}).get('fetch_type') in ('splash', ):
                        type = 'splash'
                        result = yield self.splash_fetch(url, task)
                    else:
                        type = 'http'
                        result = yield self.http_fetch(url, task)
                except Exception as e:
                    logger.exception(e)
                    result = self.handle_error(type, url, task, start_time, e)
        
                callback(type, task, result)
                self.on_result(type, task, result)
                raise gen.Return(result)
        
            def sync_fetch(self, task):
                '''Synchronization fetch, usually used in xmlrpc thread'''
                if not self._running:
                    return self.ioloop.run_sync(functools.partial(self.async_mode_fetch, task, lambda t, _, r: True))
        
                wait_result = threading.Condition()
                _result = {}
        
                def callback(type, task, result):
                    wait_result.acquire()
                    _result['type'] = type
                    _result['task'] = task
                    _result['result'] = result
                    wait_result.notify()
                    wait_result.release()
        
                wait_result.acquire()
                self.ioloop.add_callback(self.fetch, task, callback)
                while 'result' not in _result:
                    wait_result.wait()
                wait_result.release()
                return _result['result']
        
            def data_fetch(self, url, task):
                '''A fake fetcher for dataurl'''
                self.on_fetch('data', task)
                result = {}
                result['orig_url'] = url
                result['content'] = dataurl.decode(url)
                result['headers'] = {}
                result['status_code'] = 200
                result['url'] = url
                result['cookies'] = {}
                result['time'] = 0
                result['save'] = task.get('fetch', {}).get('save')
                if len(result['content']) < 70:
                    logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
                else:
                    logger.info(
                        "[200] %s:%s data:,%s...[content:%d] 0s",
                        task.get('project'), task.get('taskid'),
                        result['content'][:70],
                        len(result['content'])
                    )
        
                return result
        
            def handle_error(self, type, url, task, start_time, error):
                result = {
                    'status_code': getattr(error, 'code', 599),
                    'error': utils.text(error),
                    'traceback': traceback.format_exc() if sys.exc_info()[0] else None,
                    'content': "",
                    'time': time.time() - start_time,
                    'orig_url': url,
                    'url': url,
                    "save": task.get('fetch', {}).get('save')
                }
                logger.error("[%d] %s:%s %s, %r %.2fs",
                             result['status_code'], task.get('project'), task.get('taskid'),
                             url, error, result['time'])
                return result
        
            allowed_options = ['method', 'data', 'connect_timeout', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
        
            def pack_tornado_request_parameters(self, url, task):
                fetch = copy.deepcopy(self.default_options)
                fetch['url'] = url
                fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
                fetch['headers']['User-Agent'] = self.user_agent
                task_fetch = task.get('fetch', {})
                for each in self.allowed_options:
                    if each in task_fetch:
                        fetch[each] = task_fetch[each]
                fetch['headers'].update(task_fetch.get('headers', {}))
        
                if task.get('track'):
                    track_headers = tornado.httputil.HTTPHeaders(
                        task.get('track', {}).get('fetch', {}).get('headers') or {})
                    track_ok = task.get('track', {}).get('process', {}).get('ok', False)
                else:
                    track_headers = {}
                    track_ok = False
                # proxy
                proxy_string = None
                if isinstance(task_fetch.get('proxy'), six.string_types):
                    proxy_string = task_fetch['proxy']
                elif self.proxy and task_fetch.get('proxy', True):
                    proxy_string = self.proxy
                if proxy_string:
                    if '://' not in proxy_string:
                        proxy_string = 'http://' + proxy_string
                    proxy_splited = urlsplit(proxy_string)
                    fetch['proxy_host'] = proxy_splited.hostname
                    if proxy_splited.username:
                        fetch['proxy_username'] = proxy_splited.username
                    if proxy_splited.password:
                        fetch['proxy_password'] = proxy_splited.password
                    if six.PY2:
                        for key in ('proxy_host', 'proxy_username', 'proxy_password'):
                            if key in fetch:
                                fetch[key] = fetch[key].encode('utf8')
                    fetch['proxy_port'] = proxy_splited.port or 8080
        
                # etag
                if task_fetch.get('etag', True):
                    _t = None
                    if isinstance(task_fetch.get('etag'), six.string_types):
                        _t = task_fetch.get('etag')
                    elif track_ok:
                        _t = track_headers.get('etag')
                    if _t and 'If-None-Match' not in fetch['headers']:
                        fetch['headers']['If-None-Match'] = _t
                # last modifed
                if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
                    last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
                    _t = None
                    if isinstance(last_modified, six.string_types):
                        _t = last_modified
                    elif track_ok:
                        _t = track_headers.get('last-modified')
                    if _t and 'If-Modified-Since' not in fetch['headers']:
                        fetch['headers']['If-Modified-Since'] = _t
                # timeout
                if 'timeout' in fetch:
                    fetch['request_timeout'] = fetch['timeout']
                    del fetch['timeout']
                # data rename to body
                if 'data' in fetch:
                    fetch['body'] = fetch['data']
                    del fetch['data']
        
                return fetch
        
            @gen.coroutine
            def can_fetch(self, user_agent, url):
                parsed = urlsplit(url)
                domain = parsed.netloc
                if domain in self.robots_txt_cache:
                    robot_txt = self.robots_txt_cache[domain]
                    if time.time() - robot_txt.mtime() > self.robot_txt_age:
                        robot_txt = None
                else:
                    robot_txt = None
        
                if robot_txt is None:
                    robot_txt = RobotFileParser()
                    try:
                        response = yield gen.maybe_future(self.http_client.fetch(
                            urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
                        content = response.body
                    except tornado.httpclient.HTTPError as e:
                        logger.error('load robots.txt from %s error: %r', domain, e)
                        content = ''
        
                    try:
                        content = content.decode('utf8', 'ignore')
                    except UnicodeDecodeError:
                        content = ''
        
                    robot_txt.parse(content.splitlines())
                    self.robots_txt_cache[domain] = robot_txt
        
                raise gen.Return(robot_txt.can_fetch(user_agent, url))
        
            def clear_robot_txt_cache(self):
                now = time.time()
                for domain, robot_txt in self.robots_txt_cache.items():
                    if now - robot_txt.mtime() > self.robot_txt_age:
                        del self.robots_txt_cache[domain]
        
            @gen.coroutine
            def http_fetch(self, url, task):
                '''HTTP fetcher'''
                start_time = time.time()
                self.on_fetch('http', task)
                handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
        
                # setup request parameters
                fetch = self.pack_tornado_request_parameters(url, task)
                task_fetch = task.get('fetch', {})
        
                session = cookies.RequestsCookieJar()
                # fix for tornado request obj
                if 'Cookie' in fetch['headers']:
                    c = http_cookies.SimpleCookie()
                    try:
                        c.load(fetch['headers']['Cookie'])
                    except AttributeError:
                        c.load(utils.utf8(fetch['headers']['Cookie']))
                    for key in c:
                        session.set(key, c[key])
                    del fetch['headers']['Cookie']
                if 'cookies' in fetch:
                    session.update(fetch['cookies'])
                    del fetch['cookies']
        
                max_redirects = task_fetch.get('max_redirects', 5)
                # we will handle redirects by hand to capture cookies
                fetch['follow_redirects'] = False
        
                # making requests
                while True:
                    # robots.txt
                    if task_fetch.get('robots_txt', False):
                        can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
                        if not can_fetch:
                            error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
                            raise gen.Return(handle_error(error))
        
                    try:
                        request = tornado.httpclient.HTTPRequest(**fetch)
                        # if cookie already in header, get_cookie_header wouldn't work
                        old_cookie_header = request.headers.get('Cookie')
                        if old_cookie_header:
                            del request.headers['Cookie']
                        cookie_header = cookies.get_cookie_header(session, request)
                        if cookie_header:
                            request.headers['Cookie'] = cookie_header
                        elif old_cookie_header:
                            request.headers['Cookie'] = old_cookie_header
                    except Exception as e:
                        logger.exception(fetch)
                        raise gen.Return(handle_error(e))
        
                    try:
                        response = yield gen.maybe_future(self.http_client.fetch(request))
                    except tornado.httpclient.HTTPError as e:
                        if e.response:
                            response = e.response
                        else:
                            raise gen.Return(handle_error(e))
        
                    extract_cookies_to_jar(session, response.request, response.headers)
                    if (response.code in (301, 302, 303, 307)
                            and response.headers.get('Location')
                            and task_fetch.get('allow_redirects', True)):
                        if max_redirects <= 0:
                            error = tornado.httpclient.HTTPError(
                                599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
                                response)
                            raise gen.Return(handle_error(error))
                        if response.code in (302, 303):
                            fetch['method'] = 'GET'
                            if 'body' in fetch:
                                del fetch['body']
                        fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
                        fetch['request_timeout'] -= time.time() - start_time
                        if fetch['request_timeout'] < 0:
                            fetch['request_timeout'] = 0.1
                        max_redirects -= 1
                        continue
        
                    result = {}
                    result['orig_url'] = url
                    result['content'] = response.body or ''
                    result['headers'] = dict(response.headers)
                    result['status_code'] = response.code
                    result['url'] = response.effective_url or url
                    result['time'] = time.time() - start_time
                    result['cookies'] = session.get_dict()
                    result['save'] = task_fetch.get('save')
                    if response.error:
                        result['error'] = utils.text(response.error)
                    if 200 <= response.code < 300:
                        logger.info("[%d] %s:%s %s %.2fs", response.code,
                                    task.get('project'), task.get('taskid'),
                                    url, result['time'])
                    else:
                        logger.warning("[%d] %s:%s %s %.2fs", response.code,
                                       task.get('project'), task.get('taskid'),
                                       url, result['time'])
        
                    raise gen.Return(result)
        
            @gen.coroutine
            def phantomjs_fetch(self, url, task):
                '''Fetch with phantomjs proxy'''
                start_time = time.time()
                self.on_fetch('phantomjs', task)
                handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
        
                # check phantomjs proxy is enabled
                if not self.phantomjs_proxy:
                    result = {
                        "orig_url": url,
                        "content": "phantomjs is not enabled.",
                        "headers": {},
                        "status_code": 501,
                        "url": url,
                        "time": time.time() - start_time,
                        "cookies": {},
                        "save": task.get('fetch', {}).get('save')
                    }
                    logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
                    raise gen.Return(result)
        
                # setup request parameters
                fetch = self.pack_tornado_request_parameters(url, task)
                task_fetch = task.get('fetch', {})
                for each in task_fetch:
                    if each not in fetch:
                        fetch[each] = task_fetch[each]
        
                # robots.txt
                if task_fetch.get('robots_txt', False):
                    user_agent = fetch['headers']['User-Agent']
                    can_fetch = yield self.can_fetch(user_agent, url)
                    if not can_fetch:
                        error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
                        raise gen.Return(handle_error(error))
        
                request_conf = {
                    'follow_redirects': False
                }
                request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
                request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
        
                session = cookies.RequestsCookieJar()
                if 'Cookie' in fetch['headers']:
                    c = http_cookies.SimpleCookie()
                    try:
                        c.load(fetch['headers']['Cookie'])
                    except AttributeError:
                        c.load(utils.utf8(fetch['headers']['Cookie']))
                    for key in c:
                        session.set(key, c[key])
                    del fetch['headers']['Cookie']
                if 'cookies' in fetch:
                    session.update(fetch['cookies'])
                    del fetch['cookies']
        
                request = tornado.httpclient.HTTPRequest(url=fetch['url'])
                cookie_header = cookies.get_cookie_header(session, request)
                if cookie_header:
                    fetch['headers']['Cookie'] = cookie_header
        
                # making requests
                fetch['headers'] = dict(fetch['headers'])
                try:
                    request = tornado.httpclient.HTTPRequest(
                        url=self.phantomjs_proxy, method="POST",
                        body=json.dumps(fetch), **request_conf)
                except Exception as e:
                    raise gen.Return(handle_error(e))
        
                try:
                    response = yield gen.maybe_future(self.http_client.fetch(request))
                except tornado.httpclient.HTTPError as e:
                    if e.response:
                        response = e.response
                    else:
                        raise gen.Return(handle_error(e))
        
                if not response.body:
                    raise gen.Return(handle_error(Exception('no response from phantomjs: %r' % response)))
        
                result = {}
                try:
                    result = json.loads(utils.text(response.body))
                    assert 'status_code' in result, result
                except Exception as e:
                    if response.error:
                        result['error'] = utils.text(response.error)
                    raise gen.Return(handle_error(e))
        
                if result.get('status_code', 200):
                    logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
                                task.get('project'), task.get('taskid'), url, result['time'])
                else:
                    logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
                                 task.get('project'), task.get('taskid'),
                                 url, result['content'], result['time'])
        
                raise gen.Return(result)
        
            @gen.coroutine
            def splash_fetch(self, url, task):
                '''Fetch with splash'''
                start_time = time.time()
                self.on_fetch('splash', task)
                handle_error = lambda x: self.handle_error('splash', url, task, start_time, x)
        
                # check phantomjs proxy is enabled
                if not self.splash_endpoint:
                    result = {
                        "orig_url": url,
                        "content": "splash is not enabled.",
                        "headers": {},
                        "status_code": 501,
                        "url": url,
                        "time": time.time() - start_time,
                        "cookies": {},
                        "save": task.get('fetch', {}).get('save')
                    }
                    logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
                    raise gen.Return(result)
        
                # setup request parameters
                fetch = self.pack_tornado_request_parameters(url, task)
                task_fetch = task.get('fetch', {})
                for each in task_fetch:
                    if each not in fetch:
                        fetch[each] = task_fetch[each]
        
                # robots.txt
                if task_fetch.get('robots_txt', False):
                    user_agent = fetch['headers']['User-Agent']
                    can_fetch = yield self.can_fetch(user_agent, url)
                    if not can_fetch:
                        error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
                        raise gen.Return(handle_error(error))
        
                request_conf = {
                    'follow_redirects': False,
                    'headers': {
                        'Content-Type': 'application/json',
                    }
                }
                request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
                request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
        
                session = cookies.RequestsCookieJar()
                if 'Cookie' in fetch['headers']:
                    c = http_cookies.SimpleCookie()
                    try:
                        c.load(fetch['headers']['Cookie'])
                    except AttributeError:
                        c.load(utils.utf8(fetch['headers']['Cookie']))
                    for key in c:
                        session.set(key, c[key])
                    del fetch['headers']['Cookie']
                if 'cookies' in fetch:
                    session.update(fetch['cookies'])
                    del fetch['cookies']
        
                request = tornado.httpclient.HTTPRequest(url=fetch['url'])
                cookie_header = cookies.get_cookie_header(session, request)
                if cookie_header:
                    fetch['headers']['Cookie'] = cookie_header
        
                # making requests
                fetch['lua_source'] = self.splash_lua_source
                fetch['headers'] = dict(fetch['headers'])
                try:
                    request = tornado.httpclient.HTTPRequest(
                        url=self.splash_endpoint, method="POST",
                        body=json.dumps(fetch), **request_conf)
                except Exception as e:
                    raise gen.Return(handle_error(e))
        
                try:
                    response = yield gen.maybe_future(self.http_client.fetch(request))
                except tornado.httpclient.HTTPError as e:
                    if e.response:
                        response = e.response
                    else:
                        raise gen.Return(handle_error(e))
        
                if not response.body:
                    raise gen.Return(handle_error(Exception('no response from phantomjs')))
        
                result = {}
                try:
                    result = json.loads(utils.text(response.body))
                    assert 'status_code' in result, result
                except ValueError as e:
                    logger.error("result is not json: %r", response.body[:500])
                    raise gen.Return(handle_error(e))
                except Exception as e:
                    if response.error:
                        result['error'] = utils.text(response.error)
                    raise gen.Return(handle_error(e))
        
                if result.get('status_code', 200):
                    logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
                                task.get('project'), task.get('taskid'), url, result['time'])
                else:
                    logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
                                 task.get('project'), task.get('taskid'),
                                 url, result['content'], result['time'])
        
                raise gen.Return(result)
        
            def run(self):
                '''Run loop'''
                logger.info("fetcher starting...")
        
                def queue_loop():
                    if not self.outqueue or not self.inqueue:
                        return
                    while not self._quit:
                        try:
                            if self.outqueue.full():
                                break
                            if self.http_client.free_size() <= 0:
                                break
                            task = self.inqueue.get_nowait()
                            # FIXME: decode unicode_obj should used after data selete from
                            # database, it's used here for performance
                            task = utils.decode_unicode_obj(task)
                            self.fetch(task)
                        except queue.Empty:
                            break
                        except KeyboardInterrupt:
                            break
                        except Exception as e:
                            logger.exception(e)
                            break
        
                tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
                tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
                self._running = True
        
                try:
                    self.ioloop.start()
                except KeyboardInterrupt:
                    pass
        
                logger.info("fetcher exiting...")
        
            def quit(self):
                '''Quit fetcher'''
                self._running = False
                self._quit = True
                self.ioloop.add_callback(self.ioloop.stop)
                if hasattr(self, 'xmlrpc_server'):
                    self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
                    self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
        
            def size(self):
                return self.http_client.size()
        
            def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
                '''Run xmlrpc server'''
                import umsgpack
                from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
                try:
                    from xmlrpc.client import Binary
                except ImportError:
                    from xmlrpclib import Binary
        
                application = WSGIXMLRPCApplication()
        
                application.register_function(self.quit, '_quit')
                application.register_function(self.size)
        
                def sync_fetch(task):
                    result = self.sync_fetch(task)
                    result = Binary(umsgpack.packb(result))
                    return result
                application.register_function(sync_fetch, 'fetch')
        
                def dump_counter(_time, _type):
                    return self._cnt[_time].to_dict(_type)
                application.register_function(dump_counter, 'counter')
        
                import tornado.wsgi
                import tornado.ioloop
                import tornado.httpserver
        
                container = tornado.wsgi.WSGIContainer(application)
                self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
                self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
                self.xmlrpc_server.listen(port=port, address=bind)
                logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
                self.xmlrpc_ioloop.start()
        
            def on_fetch(self, type, task):
                '''Called before task fetch'''
                pass
        
            def on_result(self, type, task, result):
                '''Called after task fetched'''
                status_code = result.get('status_code', 599)
                if status_code != 599:
                    status_code = (int(status_code) / 100 * 100)
                self._cnt['5m'].event((task.get('project'), status_code), +1)
                self._cnt['1h'].event((task.get('project'), status_code), +1)
        
                if type in ('http', 'phantomjs') and result.get('time'):
                    content_len = len(result.get('content', ''))
                    self._cnt['5m'].event((task.get('project'), 'speed'),
                                          float(content_len) / result.get('time'))
                    self._cnt['1h'].event((task.get('project'), 'speed'),
                                          float(content_len) / result.get('time'))
                    self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
                    self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
        View Code
      • 3.webuiapp.py(直接全部替换,async为async_mode)
      • #!/usr/bin/env python
        # -*- encoding: utf-8 -*-
        # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
        # Author: Binux<i@binux.me>
        #         http://binux.me
        # Created on 2014-02-22 23:17:13
        
        import os
        import sys
        import logging
        logger = logging.getLogger("webui")
        
        from six import reraise
        from six.moves import builtins
        from six.moves.urllib.parse import urljoin
        from flask import Flask
        from pyspider.fetcher import tornado_fetcher
        
        if os.name == 'nt':
            import mimetypes
            mimetypes.add_type("text/css", ".css", True)
        
        
        class QuitableFlask(Flask):
            """Add quit() method to Flask object"""
        
            @property
            def logger(self):
                return logger
        
            def run(self, host=None, port=None, debug=None, **options):
                import tornado.wsgi
                import tornado.ioloop
                import tornado.httpserver
                import tornado.web
        
                if host is None:
                    host = '127.0.0.1'
                if port is None:
                    server_name = self.config['SERVER_NAME']
                    if server_name and ':' in server_name:
                        port = int(server_name.rsplit(':', 1)[1])
                    else:
                        port = 5000
                if debug is not None:
                    self.debug = bool(debug)
        
                hostname = host
                port = port
                application = self
                use_reloader = self.debug
                use_debugger = self.debug
        
                if use_debugger:
                    from werkzeug.debug import DebuggedApplication
                    application = DebuggedApplication(application, True)
        
                try:
                    from .webdav import dav_app
                except ImportError as e:
                    logger.warning('WebDav interface not enabled: %r', e)
                    dav_app = None
                if dav_app:
                    from werkzeug.wsgi import DispatcherMiddleware
                    application = DispatcherMiddleware(application, {
                        '/dav': dav_app
                    })
        
                container = tornado.wsgi.WSGIContainer(application)
                self.http_server = tornado.httpserver.HTTPServer(container)
                self.http_server.listen(port, hostname)
                if use_reloader:
                    from tornado import autoreload
                    autoreload.start()
        
                self.logger.info('webui running on %s:%s', hostname, port)
                self.ioloop = tornado.ioloop.IOLoop.current()
                self.ioloop.start()
        
            def quit(self):
                if hasattr(self, 'ioloop'):
                    self.ioloop.add_callback(self.http_server.stop)
                    self.ioloop.add_callback(self.ioloop.stop)
                self.logger.info('webui exiting...')
        
        
        app = QuitableFlask('webui',
                            static_folder=os.path.join(os.path.dirname(__file__), 'static'),
                            template_folder=os.path.join(os.path.dirname(__file__), 'templates'))
        app.secret_key = os.urandom(24)
        app.jinja_env.line_statement_prefix = '#'
        app.jinja_env.globals.update(builtins.__dict__)
        
        app.config.update({
            'fetch': lambda x: tornado_fetcher.Fetcher(None, None, async_mode=False).fetch(x),
            'taskdb': None,
            'projectdb': None,
            'scheduler_rpc': None,
            'queues': dict(),
            'process_time_limit': 30,
        })
        
        
        def cdn_url_handler(error, endpoint, kwargs):
            if endpoint == 'cdn':
                path = kwargs.pop('path')
                # cdn = app.config.get('cdn', 'http://cdn.staticfile.org/')
                # cdn = app.config.get('cdn', '//cdnjs.cloudflare.com/ajax/libs/')
                cdn = app.config.get('cdn', '//cdnjscn.b0.upaiyun.com/libs/')
                return urljoin(cdn, path)
            else:
                exc_type, exc_value, tb = sys.exc_info()
                if exc_value is error:
                    reraise(exc_type, exc_value, tb)
                else:
                    raise error
        app.handle_url_build_error = cdn_url_handler
        View Code

    第二步:修改webdav.py第209行

    如果报错,ValueError: Invalid configuration: - Deprecated option 'domaincontroller': use 'http_authenticator

    • 原因是因为WsgiDAV发布了版本 pre-release 3.x。
    • 解决办法
      • 在安装包中找到pyspider的资源包,然后找到webui文件里面的webdav.py文件打开,修改第209行即可。

    'domaincontroller': NeedAuthController(app),

    修改为:

    'http_authenticator':{
            'HTTPAuthenticator':NeedAuthController(app),
        },

    第三步:降低WsgiDAV版本

    python -m pip install wsgidav==2.4.1

    然后再执行pyspider all就能够通过http://localhost:5000打开页面了。

  • 相关阅读:
    初识echarts
    深浅拷贝的理解
    react基本语法及组件
    webpack使用
    网上面试资料整理
    封装原生promise函数
    vue路由懒加载及组件懒加载
    译文---C#堆VS栈(Part Four)
    译文---C#堆VS栈(Part Three)
    译文---C#堆VS栈(Part Two)
  • 原文地址:https://www.cnblogs.com/akinodoo/p/12175423.html
Copyright © 2011-2022 走看看