cli()   F
last analyzed

Complexity

Conditions 28

Size

Total Lines 105

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 28
dl 0
loc 105
rs 2
c 1
b 1
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like cli() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-03-05 00:11:49
7
8
9
import os
10
import sys
11
import six
12
import copy
13
import time
14
import shutil
15
import logging
16
import logging.config
17
18
import click
19
import pyspider
20
from pyspider.message_queue import connect_message_queue
21
from pyspider.database import connect_database
22
from pyspider.libs import utils
23
24
25
def read_config(ctx, param, value):
26
    if not value:
27
        return {}
28
    import json
29
30
    def underline_dict(d):
31
        if not isinstance(d, dict):
32
            return d
33
        return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
34
35
    config = underline_dict(json.load(value))
36
    ctx.default_map = config
37
    return config
38
39
40
def connect_db(ctx, param, value):
41
    if not value:
42
        return
43
    return utils.Get(lambda: connect_database(value))
44
45
46
def load_cls(ctx, param, value):
47
    if isinstance(value, six.string_types):
48
        return utils.load_object(value)
49
    return value
50
51
52
def connect_rpc(ctx, param, value):
53
    if not value:
54
        return
55
    try:
56
        from six.moves import xmlrpc_client
57
    except ImportError:
58
        import xmlrpclib as xmlrpc_client
59
    return xmlrpc_client.ServerProxy(value, allow_none=True)
60
61
62
@click.group(invoke_without_command=True)
63
@click.option('-c', '--config', callback=read_config, type=click.File('r'),
64
              help='a json file with default values for subcommands. {"webui": {"port":5001}}')
65
@click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
66
              help="logging config file for built-in python logging module", show_default=True)
67
@click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
68
@click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
69
              help='maxsize of queue')
70
@click.option('--taskdb', envvar='TASKDB', callback=connect_db,
71
              help='database url for taskdb, default: sqlite')
72
@click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
73
              help='database url for projectdb, default: sqlite')
74
@click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
75
              help='database url for resultdb, default: sqlite')
76
@click.option('--message-queue', envvar='AMQP_URL',
77
              help='connection url to message queue, '
78
              'default: builtin multiprocessing.Queue')
79
@click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
80
              'please use --message-queue instead.')
81
@click.option('--beanstalk', envvar='BEANSTALK_HOST',
82
              help='[deprecated] beanstalk config for beanstalk queue. '
83
              'please use --message-queue instead.')
84
@click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
85
@click.option('--data-path', default='./data', help='data dir path')
86
@click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
87
              help='add current working directory to python lib search path')
88
@click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
89
@click.pass_context
90
def cli(ctx, **kwargs):
91
    """
92
    A powerful spider system in python.
93
    """
94
    if kwargs['add_sys_path']:
95
        sys.path.append(os.getcwd())
96
97
    logging.config.fileConfig(kwargs['logging_config'])
98
99
    # get db from env
100
    for db in ('taskdb', 'projectdb', 'resultdb'):
101
        if kwargs[db] is not None:
102
            continue
103
        if os.environ.get('MYSQL_NAME'):
104
            kwargs[db] = utils.Get(lambda db=db: connect_database(
105
                'sqlalchemy+mysql+%s://%s:%s/%s' % (
106
                    db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
107
                    os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
108
        elif os.environ.get('MONGODB_NAME'):
109
            kwargs[db] = utils.Get(lambda db=db: connect_database(
110
                'mongodb+%s://%s:%s/%s' % (
111
                    db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
112
                    os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
113
        elif ctx.invoked_subcommand == 'bench':
114
            if kwargs['data_path'] == './data':
115
                kwargs['data_path'] += '/bench'
116
                shutil.rmtree(kwargs['data_path'], ignore_errors=True)
117
                os.mkdir(kwargs['data_path'])
118
            if db in ('taskdb', 'resultdb'):
119
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
120
            elif db in ('projectdb', ):
121
                kwargs[db] = utils.Get(lambda db=db: connect_database('local+%s://%s' % (
122
                    db, os.path.join(os.path.dirname(__file__), 'libs/bench.py'))))
123
        else:
124
            if not os.path.exists(kwargs['data_path']):
125
                os.mkdir(kwargs['data_path'])
126
            kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
127
                db, kwargs['data_path'], db[:-2])))
128
            kwargs['is_%s_default' % db] = True
129
130
    # create folder for counter.dump
131
    if not os.path.exists(kwargs['data_path']):
132
        os.mkdir(kwargs['data_path'])
133
134
    # message queue, compatible with old version
135
    if kwargs.get('message_queue'):
136
        pass
137
    elif kwargs.get('amqp_url'):
138
        kwargs['message_queue'] = kwargs['amqp_url']
139
    elif os.environ.get('RABBITMQ_NAME'):
140
        kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
141
                                   ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
142
    elif kwargs.get('beanstalk'):
143
        kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
144
145
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
146
                 'fetcher2processor', 'processor2result'):
147
        if kwargs.get('message_queue'):
148
            kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
149
                name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
150
        else:
151
            kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
152
                                                 kwargs['queue_maxsize'])
153
154
    # phantomjs-proxy
155
    if kwargs.get('phantomjs_proxy'):
156
        pass
157
    elif os.environ.get('PHANTOMJS_NAME'):
158
        kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
159
160
    ctx.obj = utils.ObjectDict(ctx.obj or {})
161
    ctx.obj['instances'] = []
162
    ctx.obj.update(kwargs)
163
164
    if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
165
        ctx.invoke(all)
166
    return ctx
167
168
169
@cli.command()
170
@click.option('--xmlrpc/--no-xmlrpc', default=True)
171
@click.option('--xmlrpc-host', default='0.0.0.0')
172
@click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
173
@click.option('--inqueue-limit', default=0,
174
              help='size limit of task queue for each project, '
175
              'tasks will been ignored when overflow')
176
@click.option('--delete-time', default=24 * 60 * 60,
177
              help='delete time before marked as delete')
178
@click.option('--active-tasks', default=100, help='active log size')
179
@click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
180
@click.option('--fail-pause-num', default=10, help='auto pause the project when last FAIL_PAUSE_NUM task failed, set 0 to disable')
181
@click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
182
              help='scheduler class to be used.')
183
@click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
184
@click.pass_context
185
def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port,
186
              inqueue_limit, delete_time, active_tasks, loop_limit, fail_pause_num,
187
              scheduler_cls, threads, get_object=False):
188
    """
189
    Run Scheduler, only one scheduler is allowed.
190
    """
191
    g = ctx.obj
192
    Scheduler = load_cls(None, None, scheduler_cls)
193
194
    kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
195
                  newtask_queue=g.newtask_queue, status_queue=g.status_queue,
196
                  out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data'))
197
    if threads:
198
        kwargs['threads'] = int(threads)
199
200
    scheduler = Scheduler(**kwargs)
201
    scheduler.INQUEUE_LIMIT = inqueue_limit
202
    scheduler.DELETE_TIME = delete_time
203
    scheduler.ACTIVE_TASKS = active_tasks
204
    scheduler.LOOP_LIMIT = loop_limit
205
    scheduler.FAIL_PAUSE_NUM = fail_pause_num
206
207
    g.instances.append(scheduler)
208
    if g.get('testing_mode') or get_object:
209
        return scheduler
210
211
    if xmlrpc:
212
        utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
213
    scheduler.run()
214
215
216
@cli.command()
217
@click.option('--xmlrpc/--no-xmlrpc', default=False)
218
@click.option('--xmlrpc-host', default='0.0.0.0')
219
@click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444)
220
@click.option('--poolsize', default=100, help="max simultaneous fetches")
221
@click.option('--proxy', help="proxy host:port")
222
@click.option('--user-agent', help='user agent')
223
@click.option('--timeout', help='default fetch timeout')
224
@click.option('--phantomjs-endpoint', help="endpoint of phantomjs, start via pyspider phantomjs")
225
@click.option('--splash-endpoint', help="execute endpoint of splash: http://splash.readthedocs.io/en/stable/api.html#execute")
226
@click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls,
227
              help='Fetcher class to be used.')
228
@click.pass_context
229
def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent,
230
            timeout, phantomjs_endpoint, splash_endpoint, fetcher_cls,
231
            async_mode=True, get_object=False, no_input=False):
232
    """
233
    Run Fetcher.
234
    """
235
    g = ctx.obj
236
    Fetcher = load_cls(None, None, fetcher_cls)
237
238
    if no_input:
239
        inqueue = None
240
        outqueue = None
241
    else:
242
        inqueue = g.scheduler2fetcher
243
        outqueue = g.fetcher2processor
244
    fetcher = Fetcher(inqueue=inqueue, outqueue=outqueue,
245
                      poolsize=poolsize, proxy=proxy, async_mode=async_mode)
246
    fetcher.phantomjs_proxy = phantomjs_endpoint or g.phantomjs_proxy
247
    fetcher.splash_endpoint = splash_endpoint
248
    if user_agent:
249
        fetcher.user_agent = user_agent
250
    if timeout:
251
        fetcher.default_options = copy.deepcopy(fetcher.default_options)
252
        fetcher.default_options['timeout'] = timeout
253
254
    g.instances.append(fetcher)
255
    if g.get('testing_mode') or get_object:
256
        return fetcher
257
258
    if xmlrpc:
259
        utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
260
    fetcher.run()
261
262
263
@cli.command()
264
@click.option('--processor-cls', default='pyspider.processor.Processor',
265
              callback=load_cls, help='Processor class to be used.')
266
@click.option('--process-time-limit', default=30, help='script process time limit')
267
@click.pass_context
268
def processor(ctx, processor_cls, process_time_limit, enable_stdout_capture=True, get_object=False):
269
    """
270
    Run Processor.
271
    """
272
    g = ctx.obj
273
    Processor = load_cls(None, None, processor_cls)
274
275
    processor = Processor(projectdb=g.projectdb,
276
                          inqueue=g.fetcher2processor, status_queue=g.status_queue,
277
                          newtask_queue=g.newtask_queue, result_queue=g.processor2result,
278
                          enable_stdout_capture=enable_stdout_capture,
279
                          process_time_limit=process_time_limit)
280
281
    g.instances.append(processor)
282
    if g.get('testing_mode') or get_object:
283
        return processor
284
285
    processor.run()
286
287
288
@cli.command()
289
@click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
290
              help='ResultWorker class to be used.')
291
@click.pass_context
292
def result_worker(ctx, result_cls, get_object=False):
293
    """
294
    Run result worker.
295
    """
296
    g = ctx.obj
297
    ResultWorker = load_cls(None, None, result_cls)
298
299
    result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
300
301
    g.instances.append(result_worker)
302
    if g.get('testing_mode') or get_object:
303
        return result_worker
304
305
    result_worker.run()
306
307
308
@cli.command()
309
@click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST',
310
              help='webui bind to host')
311
@click.option('--port', default=5000, envvar='WEBUI_PORT',
312
              help='webui bind to host')
313
@click.option('--cdn', default='//cdnjs.cloudflare.com/ajax/libs/',
314
              help='js/css cdn server')
315
@click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
316
@click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
317
@click.option('--max-rate', type=float, help='max rate for each project')
318
@click.option('--max-burst', type=float, help='max burst for each project')
319
@click.option('--username', envvar='WEBUI_USERNAME',
320
              help='username of lock -ed projects')
321
@click.option('--password', envvar='WEBUI_PASSWORD',
322
              help='password of lock -ed projects')
323
@click.option('--need-auth', is_flag=True, default=False, help='need username and password')
324
@click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls,
325
              help='webui Flask Application instance to be used.')
326
@click.option('--process-time-limit', default=30, help='script process time limit in debug')
327
@click.pass_context
328
def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst,
329
          username, password, need_auth, webui_instance, process_time_limit, get_object=False):
330
    """
331
    Run WebUI
332
    """
333
    app = load_cls(None, None, webui_instance)
334
335
    g = ctx.obj
336
    app.config['taskdb'] = g.taskdb
337
    app.config['projectdb'] = g.projectdb
338
    app.config['resultdb'] = g.resultdb
339
    app.config['cdn'] = cdn
340
341
    if max_rate:
342
        app.config['max_rate'] = max_rate
343
    if max_burst:
344
        app.config['max_burst'] = max_burst
345
    if username:
346
        app.config['webui_username'] = username
347
    if password:
348
        app.config['webui_password'] = password
349
    app.config['need_auth'] = need_auth
350
    app.config['process_time_limit'] = process_time_limit
351
352
    # inject queues for webui
353
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
354
                 'fetcher2processor', 'processor2result'):
355
        app.config['queues'][name] = getattr(g, name, None)
356
357
    # fetcher rpc
358
    if isinstance(fetcher_rpc, six.string_types):
359
        import umsgpack
360
        fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc)
361
        app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data)
362
    else:
363
        # get fetcher instance for webui
364
        fetcher_config = g.config.get('fetcher', {})
365
        webui_fetcher = ctx.invoke(fetcher, async_mode=False, get_object=True, no_input=True, **fetcher_config)
366
367
        app.config['fetch'] = lambda x: webui_fetcher.fetch(x)
368
369
    if isinstance(scheduler_rpc, six.string_types):
370
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
371
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
372
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % (
373
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
374
    elif scheduler_rpc is None:
375
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
376
    else:
377
        app.config['scheduler_rpc'] = scheduler_rpc
378
379
    app.debug = g.debug
380
    g.instances.append(app)
381
    if g.get('testing_mode') or get_object:
382
        return app
383
384
    app.run(host=host, port=port)
385
386
387
@cli.command()
388
@click.option('--phantomjs-path', default='phantomjs', help='phantomjs path')
389
@click.option('--port', default=25555, help='phantomjs port')
390
@click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed')
391
@click.argument('args', nargs=-1)
392
@click.pass_context
393
def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
394
    """
395
    Run phantomjs fetcher if phantomjs is installed.
396
    """
397
    args = args or ctx.default_map and ctx.default_map.get('args', [])
398
399
    import subprocess
400
    g = ctx.obj
401
    _quit = []
402
    phantomjs_fetcher = os.path.join(
403
        os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js')
404
    cmd = [phantomjs_path,
405
           # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
406
           #'--load-images=false',
407
           '--ssl-protocol=any',
408
           '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]
409
410
    try:
411
        _phantomjs = subprocess.Popen(cmd)
412
    except OSError:
413
        logging.warning('phantomjs not found, continue running without it.')
414
        return None
415
416
    def quit(*args, **kwargs):
417
        _quit.append(1)
418
        _phantomjs.kill()
419
        _phantomjs.wait()
420
        logging.info('phantomjs exited.')
421
422
    if not g.get('phantomjs_proxy'):
423
        g['phantomjs_proxy'] = '127.0.0.1:%s' % port
424
425
    phantomjs = utils.ObjectDict(port=port, quit=quit)
426
    g.instances.append(phantomjs)
427
    if g.get('testing_mode'):
428
        return phantomjs
429
430
    while True:
431
        _phantomjs.wait()
432
        if _quit or not auto_restart:
433
            break
434
        _phantomjs = subprocess.Popen(cmd)
435
436
437
@cli.command()
438
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
439
@click.option('--processor-num', default=1, help='instance num of processor')
440
@click.option('--result-worker-num', default=1,
441
              help='instance num of result worker')
442
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
443
              help='run each components in thread or subprocess. '
444
              'always using thread for windows.')
445
@click.pass_context
446
def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
447
    """
448
    Run all the components in subprocess or thread
449
    """
450
451
    ctx.obj['debug'] = False
452
    g = ctx.obj
453
454
    # FIXME: py34 cannot run components with threads
455
    if run_in == 'subprocess' and os.name != 'nt':
456
        run_in = utils.run_in_subprocess
457
    else:
458
        run_in = utils.run_in_thread
459
460
    threads = []
461
462
    try:
463
        # phantomjs
464
        if not g.get('phantomjs_proxy'):
465
            phantomjs_config = g.config.get('phantomjs', {})
466
            phantomjs_config.setdefault('auto_restart', True)
467
            threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
468
            time.sleep(2)
469
            if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
470
                g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
471
472
        # result worker
473
        result_worker_config = g.config.get('result_worker', {})
474
        for i in range(result_worker_num):
475
            threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))
476
477
        # processor
478
        processor_config = g.config.get('processor', {})
479
        for i in range(processor_num):
480
            threads.append(run_in(ctx.invoke, processor, **processor_config))
481
482
        # fetcher
483
        fetcher_config = g.config.get('fetcher', {})
484
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
485
        for i in range(fetcher_num):
486
            threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
487
488
        # scheduler
489
        scheduler_config = g.config.get('scheduler', {})
490
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
491
        threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))
492
493
        # running webui in main thread to make it exitable
494
        webui_config = g.config.get('webui', {})
495
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
496
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
497
        ctx.invoke(webui, **webui_config)
498
    finally:
499
        # exit components run in threading
500
        for each in g.instances:
501
            each.quit()
502
503
        # exit components run in subprocess
504
        for each in threads:
505
            if not each.is_alive():
506
                continue
507
            if hasattr(each, 'terminate'):
508
                each.terminate()
509
            each.join()
510
511
512
@cli.command()
513
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
514
@click.option('--processor-num', default=2, help='instance num of processor')
515
@click.option('--result-worker-num', default=1, help='instance num of result worker')
516
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
517
              help='run each components in thread or subprocess. '
518
              'always using thread for windows.')
519
@click.option('--total', default=10000, help="total url in test page")
520
@click.option('--show', default=20, help="show how many urls in a page")
521
@click.option('--taskdb-bench', default=False, is_flag=True,
522
              help="only run taskdb bench test")
523
@click.option('--message-queue-bench', default=False, is_flag=True,
524
              help="only run message queue bench test")
525
@click.option('--all-bench', default=False, is_flag=True,
526
              help="only run all bench test")
527
@click.pass_context
528
def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show,
529
          taskdb_bench, message_queue_bench, all_bench):
530
    """
531
    Run Benchmark test.
532
    In bench mode, in-memory sqlite database is used instead of on-disk sqlite database.
533
    """
534
    from pyspider.libs import bench
535
    from pyspider.webui import bench_test  # flake8: noqa
536
537
    ctx.obj['debug'] = False
538
    g = ctx.obj
539
    if result_worker_num == 0:
540
        g['processor2result'] = None
541
542
    if run_in == 'subprocess' and os.name != 'nt':
543
        run_in = utils.run_in_subprocess
544
    else:
545
        run_in = utils.run_in_thread
546
547
    all_test = not taskdb_bench and not message_queue_bench and not all_bench
548
549
    # test taskdb
550
    if all_test or taskdb_bench:
551
        bench.bench_test_taskdb(g.taskdb)
552
    # test message queue
553
    if all_test or message_queue_bench:
554
        bench.bench_test_message_queue(g.scheduler2fetcher)
555
    # test all
556
    if not all_test and not all_bench:
557
        return
558
559
    project_name = 'bench'
560
561
    def clear_project():
562
        g.taskdb.drop(project_name)
563
        g.resultdb.drop(project_name)
564
565
    clear_project()
566
567
    # disable log
568
    logging.getLogger().setLevel(logging.ERROR)
569
    logging.getLogger('scheduler').setLevel(logging.ERROR)
570
    logging.getLogger('fetcher').setLevel(logging.ERROR)
571
    logging.getLogger('processor').setLevel(logging.ERROR)
572
    logging.getLogger('result').setLevel(logging.ERROR)
573
    logging.getLogger('webui').setLevel(logging.ERROR)
574
    logging.getLogger('werkzeug').setLevel(logging.ERROR)
575
576
    try:
577
        threads = []
578
579
        # result worker
580
        result_worker_config = g.config.get('result_worker', {})
581
        for i in range(result_worker_num):
582
            threads.append(run_in(ctx.invoke, result_worker,
583
                                  result_cls='pyspider.libs.bench.BenchResultWorker',
584
                                  **result_worker_config))
585
586
        # processor
587
        processor_config = g.config.get('processor', {})
588
        for i in range(processor_num):
589
            threads.append(run_in(ctx.invoke, processor,
590
                                  processor_cls='pyspider.libs.bench.BenchProcessor',
591
                                  **processor_config))
592
593
        # fetcher
594
        fetcher_config = g.config.get('fetcher', {})
595
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
596
        for i in range(fetcher_num):
597
            threads.append(run_in(ctx.invoke, fetcher,
598
                                  fetcher_cls='pyspider.libs.bench.BenchFetcher',
599
                                  **fetcher_config))
600
601
        # webui
602
        webui_config = g.config.get('webui', {})
603
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
604
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
605
        threads.append(run_in(ctx.invoke, webui, **webui_config))
606
607
        # scheduler
608
        scheduler_config = g.config.get('scheduler', {})
609
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
610
        scheduler_config.setdefault('xmlrpc_port', 23333)
611
        threads.append(run_in(ctx.invoke, scheduler,
612
                              scheduler_cls='pyspider.libs.bench.BenchScheduler',
613
                              **scheduler_config))
614
        scheduler_rpc = connect_rpc(ctx, None,
615
                                    'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config)
616
617
        for _ in range(20):
618
            if utils.check_port_open(23333):
619
                break
620
            time.sleep(1)
621
622
        scheduler_rpc.newtask({
623
            "project": project_name,
624
            "taskid": "on_start",
625
            "url": "data:,on_start",
626
            "fetch": {
627
                "save": {"total": total, "show": show}
628
            },
629
            "process": {
630
                "callback": "on_start",
631
            },
632
        })
633
634
        # wait bench test finished
635
        while True:
636
            time.sleep(1)
637
            if scheduler_rpc.size() == 0:
638
                break
639
    finally:
640
        # exit components run in threading
641
        for each in g.instances:
642
            each.quit()
643
644
        # exit components run in subprocess
645
        for each in threads:
646
            if hasattr(each, 'terminate'):
647
                each.terminate()
648
            each.join(1)
649
650
        clear_project()
651
652
653
@cli.command()
654
@click.option('-i', '--interactive', default=False, is_flag=True,
655
              help='enable interactive mode, you can choose crawl url.')
656
@click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True,
657
              help='enable phantomjs, will spawn a subprocess for phantomjs')
658
@click.argument('scripts', nargs=-1)
659
@click.pass_context
660
def one(ctx, interactive, enable_phantomjs, scripts):
661
    """
662
    One mode not only means all-in-one, it runs every thing in one process over
663
    tornado.ioloop, for debug purpose
664
    """
665
666
    ctx.obj['debug'] = False
667
    g = ctx.obj
668
    g['testing_mode'] = True
669
670
    if scripts:
671
        from pyspider.database.local.projectdb import ProjectDB
672
        g['projectdb'] = ProjectDB(scripts)
673
        if g.get('is_taskdb_default'):
674
            g['taskdb'] = connect_database('sqlite+taskdb://')
675
        if g.get('is_resultdb_default'):
676
            g['resultdb'] = None
677
678
    if enable_phantomjs:
679
        phantomjs_config = g.config.get('phantomjs', {})
680
        phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config)
681
        if phantomjs_obj:
682
            g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port)
683
    else:
684
        phantomjs_obj = None
685
686
    result_worker_config = g.config.get('result_worker', {})
687
    if g.resultdb is None:
688
        result_worker_config.setdefault('result_cls',
689
                                        'pyspider.result.OneResultWorker')
690
    result_worker_obj = ctx.invoke(result_worker, **result_worker_config)
691
692
    processor_config = g.config.get('processor', {})
693
    processor_config.setdefault('enable_stdout_capture', False)
694
    processor_obj = ctx.invoke(processor, **processor_config)
695
696
    fetcher_config = g.config.get('fetcher', {})
697
    fetcher_config.setdefault('xmlrpc', False)
698
    fetcher_obj = ctx.invoke(fetcher, **fetcher_config)
699
700
    scheduler_config = g.config.get('scheduler', {})
701
    scheduler_config.setdefault('xmlrpc', False)
702
    scheduler_config.setdefault('scheduler_cls',
703
                                'pyspider.scheduler.OneScheduler')
704
    scheduler_obj = ctx.invoke(scheduler, **scheduler_config)
705
706
    scheduler_obj.init_one(ioloop=fetcher_obj.ioloop,
707
                           fetcher=fetcher_obj,
708
                           processor=processor_obj,
709
                           result_worker=result_worker_obj,
710
                           interactive=interactive)
711
    if scripts:
712
        for project in g.projectdb.projects:
713
            scheduler_obj.trigger_on_start(project)
714
715
    try:
716
        scheduler_obj.run()
717
    finally:
718
        scheduler_obj.quit()
719
        if phantomjs_obj:
720
            phantomjs_obj.quit()
721
722
723
@cli.command()
724
@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
725
@click.argument('project', nargs=1)
726
@click.argument('message', nargs=1)
727
@click.pass_context
728
def send_message(ctx, scheduler_rpc, project, message):
729
    """
730
    Send Message to project from command line
731
    """
732
    if isinstance(scheduler_rpc, six.string_types):
733
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
734
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
735
        scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
736
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
737
    if scheduler_rpc is None:
738
        scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
739
740
    return scheduler_rpc.send_task({
741
        'taskid': utils.md5string('data:,on_message'),
742
        'project': project,
743
        'url': 'data:,on_message',
744
        'fetch': {
745
            'save': ('__command__', message),
746
        },
747
        'process': {
748
            'callback': '_on_message',
749
        }
750
    })
751
752
753
def main():
754
    cli()
755
756
if __name__ == '__main__':
757
    main()
758