Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

all()   F

Complexity

Conditions 13

Size

Total Lines 73

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 13
dl 0
loc 73
rs 2.3081

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like all() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-03-05 00:11:49
7
8
9
import os
10
import sys
11
import six
12
import copy
13
import time
14
import shutil
15
import logging
16
import logging.config
17
18
import click
19
import pyspider
20
from pyspider.message_queue import connect_message_queue
21
from pyspider.database import connect_database
22
from pyspider.libs import utils
23
24
25
def read_config(ctx, param, value):
26
    if not value:
27
        return {}
28
    import json
29
30
    def underline_dict(d):
31
        if not isinstance(d, dict):
32
            return d
33
        return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
34
35
    config = underline_dict(json.load(value))
36
    ctx.default_map = config
37
    return config
38
39
40
def connect_db(ctx, param, value):
41
    if not value:
42
        return
43
    return utils.Get(lambda: connect_database(value))
44
45
46
def load_cls(ctx, param, value):
47
    if isinstance(value, six.string_types):
48
        return utils.load_object(value)
49
    return value
50
51
52
def connect_rpc(ctx, param, value):
53
    if not value:
54
        return
55
    try:
56
        from six.moves import xmlrpc_client
57
    except ImportError:
58
        import xmlrpclib as xmlrpc_client
59
    return xmlrpc_client.ServerProxy(value, allow_none=True)
60
61
62
@click.group(invoke_without_command=True)
63
@click.option('-c', '--config', callback=read_config, type=click.File('r'),
64
              help='a json file with default values for subcommands. {"webui": {"port":5001}}')
65
@click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
66
              help="logging config file for built-in python logging module", show_default=True)
67
@click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
68
@click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
69
              help='maxsize of queue')
70
@click.option('--taskdb', envvar='TASKDB', callback=connect_db,
71
              help='database url for taskdb, default: sqlite')
72
@click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
73
              help='database url for projectdb, default: sqlite')
74
@click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
75
              help='database url for resultdb, default: sqlite')
76
@click.option('--message-queue', envvar='AMQP_URL',
77
              help='connection url to message queue, '
78
              'default: builtin multiprocessing.Queue')
79
@click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
80
              'please use --message-queue instead.')
81
@click.option('--beanstalk', envvar='BEANSTALK_HOST',
82
              help='[deprecated] beanstalk config for beanstalk queue. '
83
              'please use --message-queue instead.')
84
@click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
85
@click.option('--data-path', default='./data', help='data dir path')
86
@click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
87
              help='add current working directory to python lib search path')
88
@click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
89
@click.pass_context
90
def cli(ctx, **kwargs):
91
    """
92
    A powerful spider system in python.
93
    """
94
    if kwargs['add_sys_path']:
95
        sys.path.append(os.getcwd())
96
97
    logging.config.fileConfig(kwargs['logging_config'])
98
99
    # get db from env
100
    for db in ('taskdb', 'projectdb', 'resultdb'):
101
        if kwargs[db] is not None:
102
            continue
103
        if os.environ.get('MYSQL_NAME'):
104
            kwargs[db] = utils.Get(lambda db=db: connect_database(
105
                'sqlalchemy+mysql+%s://%s:%s/%s' % (
106
                    db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
107
                    os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
108
        elif os.environ.get('MONGODB_NAME'):
109
            kwargs[db] = utils.Get(lambda db=db: connect_database(
110
                'mongodb+%s://%s:%s/%s' % (
111
                    db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
112
                    os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
113
        elif ctx.invoked_subcommand == 'bench':
114
            if kwargs['data_path'] == './data':
115
                kwargs['data_path'] += '/bench'
116
                shutil.rmtree(kwargs['data_path'], ignore_errors=True)
117
                os.mkdir(kwargs['data_path'])
118
            if db in ('taskdb', 'resultdb'):
119
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
120
            else:
121
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
122
                    db, kwargs['data_path'], db[:-2])))
123
        else:
124
            if not os.path.exists(kwargs['data_path']):
125
                os.mkdir(kwargs['data_path'])
126
            kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
127
                db, kwargs['data_path'], db[:-2])))
128
            kwargs['is_%s_default' % db] = True
129
130
    # create folder for counter.dump
131
    if not os.path.exists(kwargs['data_path']):
132
        os.mkdir(kwargs['data_path'])
133
134
    # message queue, compatible with old version
135
    if kwargs.get('message_queue'):
136
        pass
137
    elif kwargs.get('amqp_url'):
138
        kwargs['message_queue'] = kwargs['amqp_url']
139
    elif os.environ.get('RABBITMQ_NAME'):
140
        kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
141
                                   ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
142
    elif kwargs.get('beanstalk'):
143
        kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
144
145
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
146
                 'fetcher2processor', 'processor2result'):
147
        if kwargs.get('message_queue'):
148
            kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
149
                name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
150
        else:
151
            kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
152
                                                 kwargs['queue_maxsize'])
153
154
    # phantomjs-proxy
155
    if kwargs.get('phantomjs_proxy'):
156
        pass
157
    elif os.environ.get('PHANTOMJS_NAME'):
158
        kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
159
160
    ctx.obj = utils.ObjectDict(ctx.obj or {})
161
    ctx.obj['instances'] = []
162
    ctx.obj.update(kwargs)
163
164
    if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
165
        ctx.invoke(all)
166
    return ctx
167
168
169
@cli.command()
170
@click.option('--xmlrpc/--no-xmlrpc', default=True)
171
@click.option('--xmlrpc-host', default='0.0.0.0')
172
@click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
173
@click.option('--inqueue-limit', default=0,
174
              help='size limit of task queue for each project, '
175
              'tasks will been ignored when overflow')
176
@click.option('--delete-time', default=24 * 60 * 60,
177
              help='delete time before marked as delete')
178
@click.option('--active-tasks', default=100, help='active log size')
179
@click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
180
@click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
181
              help='scheduler class to be used.')
182
@click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
183
@click.pass_context
184
def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port,
185
              inqueue_limit, delete_time, active_tasks, loop_limit, scheduler_cls,
186
              threads):
187
    """
188
    Run Scheduler, only one scheduler is allowed.
189
    """
190
    g = ctx.obj
191
    Scheduler = load_cls(None, None, scheduler_cls)
192
193
    kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
194
                  newtask_queue=g.newtask_queue, status_queue=g.status_queue,
195
                  out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data'))
196
    if threads:
197
        kwargs['threads'] = int(threads)
198
199
    scheduler = Scheduler(**kwargs)
200
    scheduler.INQUEUE_LIMIT = inqueue_limit
201
    scheduler.DELETE_TIME = delete_time
202
    scheduler.ACTIVE_TASKS = active_tasks
203
    scheduler.LOOP_LIMIT = loop_limit
204
205
    g.instances.append(scheduler)
206
    if g.get('testing_mode'):
207
        return scheduler
208
209
    if xmlrpc:
210
        utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
211
    scheduler.run()
212
213
214
@cli.command()
215
@click.option('--xmlrpc/--no-xmlrpc', default=False)
216
@click.option('--xmlrpc-host', default='0.0.0.0')
217
@click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444)
218
@click.option('--poolsize', default=100, help="max simultaneous fetches")
219
@click.option('--proxy', help="proxy host:port")
220
@click.option('--user-agent', help='user agent')
221
@click.option('--timeout', help='default fetch timeout')
222
@click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls,
223
              help='Fetcher class to be used.')
224
@click.pass_context
225
def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent,
226
            timeout, fetcher_cls, async=True):
227
    """
228
    Run Fetcher.
229
    """
230
    g = ctx.obj
231
    Fetcher = load_cls(None, None, fetcher_cls)
232
233
    fetcher = Fetcher(inqueue=g.scheduler2fetcher, outqueue=g.fetcher2processor,
234
                      poolsize=poolsize, proxy=proxy, async=async)
235
    fetcher.phantomjs_proxy = g.phantomjs_proxy
236
    if user_agent:
237
        fetcher.user_agent = user_agent
238
    if timeout:
239
        fetcher.default_options = copy.deepcopy(fetcher.default_options)
240
        fetcher.default_options['timeout'] = timeout
241
242
    g.instances.append(fetcher)
243
    if g.get('testing_mode'):
244
        return fetcher
245
246
    if xmlrpc:
247
        utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
248
    fetcher.run()
249
250
251
@cli.command()
252
@click.option('--processor-cls', default='pyspider.processor.Processor',
253
              callback=load_cls, help='Processor class to be used.')
254
@click.pass_context
255
def processor(ctx, processor_cls, enable_stdout_capture=True):
256
    """
257
    Run Processor.
258
    """
259
    g = ctx.obj
260
    Processor = load_cls(None, None, processor_cls)
261
262
    processor = Processor(projectdb=g.projectdb,
263
                          inqueue=g.fetcher2processor, status_queue=g.status_queue,
264
                          newtask_queue=g.newtask_queue, result_queue=g.processor2result,
265
                          enable_stdout_capture=enable_stdout_capture)
266
267
    g.instances.append(processor)
268
    if g.get('testing_mode'):
269
        return processor
270
271
    processor.run()
272
273
274
@cli.command()
275
@click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
276
              help='ResultWorker class to be used.')
277
@click.pass_context
278
def result_worker(ctx, result_cls):
279
    """
280
    Run result worker.
281
    """
282
    g = ctx.obj
283
    ResultWorker = load_cls(None, None, result_cls)
284
285
    result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
286
287
    g.instances.append(result_worker)
288
    if g.get('testing_mode'):
289
        return result_worker
290
291
    result_worker.run()
292
293
294
@cli.command()
295
@click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST',
296
              help='webui bind to host')
297
@click.option('--port', default=5000, envvar='WEBUI_PORT',
298
              help='webui bind to host')
299
@click.option('--cdn', default='//cdnjscn.b0.upaiyun.com/libs/',
300
              help='js/css cdn server')
301
@click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
302
@click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
303
@click.option('--max-rate', type=float, help='max rate for each project')
304
@click.option('--max-burst', type=float, help='max burst for each project')
305
@click.option('--username', envvar='WEBUI_USERNAME',
306
              help='username of lock -ed projects')
307
@click.option('--password', envvar='WEBUI_PASSWORD',
308
              help='password of lock -ed projects')
309
@click.option('--need-auth', is_flag=True, default=False, help='need username and password')
310
@click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls,
311
              help='webui Flask Application instance to be used.')
312
@click.pass_context
313
def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst,
314
          username, password, need_auth, webui_instance):
315
    """
316
    Run WebUI
317
    """
318
    app = load_cls(None, None, webui_instance)
319
320
    g = ctx.obj
321
    app.config['taskdb'] = g.taskdb
322
    app.config['projectdb'] = g.projectdb
323
    app.config['resultdb'] = g.resultdb
324
    app.config['cdn'] = cdn
325
326
    if max_rate:
327
        app.config['max_rate'] = max_rate
328
    if max_burst:
329
        app.config['max_burst'] = max_burst
330
    if username:
331
        app.config['webui_username'] = username
332
    if password:
333
        app.config['webui_password'] = password
334
    app.config['need_auth'] = need_auth
335
336
    # inject queues for webui
337
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
338
                 'fetcher2processor', 'processor2result'):
339
        app.config['queues'][name] = getattr(g, name, None)
340
341
    # fetcher rpc
342
    if isinstance(fetcher_rpc, six.string_types):
343
        import umsgpack
344
        fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc)
345
        app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data)
346
    else:
347
        # get fetcher instance for webui
348
        fetcher_config = g.config.get('fetcher', {})
349
        scheduler2fetcher = g.scheduler2fetcher
350
        fetcher2processor = g.fetcher2processor
351
        testing_mode = g.get('testing_mode', False)
352
        g['scheduler2fetcher'] = None
353
        g['fetcher2processor'] = None
354
        g['testing_mode'] = True
355
        webui_fetcher = ctx.invoke(fetcher, async=False, **fetcher_config)
356
        g['scheduler2fetcher'] = scheduler2fetcher
357
        g['fetcher2processor'] = fetcher2processor
358
        g['testing_mode'] = testing_mode
359
360
        app.config['fetch'] = lambda x: webui_fetcher.fetch(x)
361
362
    if isinstance(scheduler_rpc, six.string_types):
363
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
364
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
365
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % (
366
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
367
    elif scheduler_rpc is None:
368
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
369
    else:
370
        app.config['scheduler_rpc'] = scheduler_rpc
371
372
    app.debug = g.debug
373
    g.instances.append(app)
374
    if g.get('testing_mode'):
375
        return app
376
377
    app.run(host=host, port=port)
378
379
380
@cli.command()
381
@click.option('--phantomjs-path', default='phantomjs', help='phantomjs path')
382
@click.option('--port', default=25555, help='phantomjs port')
383
@click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed')
384
@click.argument('args', nargs=-1)
385
@click.pass_context
386
def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
387
    """
388
    Run phantomjs fetcher if phantomjs is installed.
389
    """
390
    args = args or ctx.default_map and ctx.default_map.get('args', [])
391
392
    import subprocess
393
    g = ctx.obj
394
    _quit = []
395
    phantomjs_fetcher = os.path.join(
396
        os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js')
397
    cmd = [phantomjs_path,
398
           # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
399
           #'--load-images=false',
400
           '--ssl-protocol=any',
401
           '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]
402
403
    try:
404
        _phantomjs = subprocess.Popen(cmd)
405
    except OSError:
406
        logging.warning('phantomjs not found, continue running without it.')
407
        return None
408
409
    def quit(*args, **kwargs):
410
        _quit.append(1)
411
        _phantomjs.kill()
412
        _phantomjs.wait()
413
        logging.info('phantomjs existed.')
414
415
    if not g.get('phantomjs_proxy'):
416
        g['phantomjs_proxy'] = '127.0.0.1:%s' % port
417
418
    phantomjs = utils.ObjectDict(port=port, quit=quit)
419
    g.instances.append(phantomjs)
420
    if g.get('testing_mode'):
421
        return phantomjs
422
423
    while True:
424
        _phantomjs.wait()
425
        if _quit or not auto_restart:
426
            break
427
        _phantomjs = subprocess.Popen(cmd)
428
429
430
@cli.command()
431
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
432
@click.option('--processor-num', default=1, help='instance num of processor')
433
@click.option('--result-worker-num', default=1,
434
              help='instance num of result worker')
435
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
436
              help='run each components in thread or subprocess. '
437
              'always using thread for windows.')
438
@click.pass_context
439
def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
440
    """
441
    Run all the components in subprocess or thread
442
    """
443
444
    ctx.obj['debug'] = False
445
    g = ctx.obj
446
447
    # FIXME: py34 cannot run components with threads
448
    if run_in == 'subprocess' and os.name != 'nt':
449
        run_in = utils.run_in_subprocess
450
    else:
451
        run_in = utils.run_in_thread
452
453
    threads = []
454
455
    try:
456
        # phantomjs
457
        if not g.get('phantomjs_proxy'):
458
            phantomjs_config = g.config.get('phantomjs', {})
459
            phantomjs_config.setdefault('auto_restart', True)
460
            threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
461
            time.sleep(2)
462
            if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
463
                g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
464
465
        # result worker
466
        result_worker_config = g.config.get('result_worker', {})
467
        for i in range(result_worker_num):
468
            threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))
469
470
        # processor
471
        processor_config = g.config.get('processor', {})
472
        for i in range(processor_num):
473
            threads.append(run_in(ctx.invoke, processor, **processor_config))
474
475
        # fetcher
476
        fetcher_config = g.config.get('fetcher', {})
477
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
478
        for i in range(fetcher_num):
479
            threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
480
481
        # scheduler
482
        scheduler_config = g.config.get('scheduler', {})
483
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
484
        threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))
485
486
        # running webui in main thread to make it exitable
487
        webui_config = g.config.get('webui', {})
488
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
489
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
490
        ctx.invoke(webui, **webui_config)
491
    finally:
492
        # exit components run in threading
493
        for each in g.instances:
494
            each.quit()
495
496
        # exit components run in subprocess
497
        for each in threads:
498
            if not each.is_alive():
499
                continue
500
            if hasattr(each, 'terminate'):
501
                each.terminate()
502
            each.join()
503
504
505
@cli.command()
506
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
507
@click.option('--processor-num', default=2, help='instance num of processor')
508
@click.option('--result-worker-num', default=1, help='instance num of result worker')
509
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
510
              help='run each components in thread or subprocess. '
511
              'always using thread for windows.')
512
@click.option('--total', default=10000, help="total url in test page")
513
@click.option('--show', default=20, help="show how many urls in a page")
514
@click.option('--taskdb-bench', default=False, is_flag=True,
515
              help="only run taskdb bench test")
516
@click.option('--message-queue-bench', default=False, is_flag=True,
517
              help="only run message queue bench test")
518
@click.option('--all-bench', default=False, is_flag=True,
519
              help="only run all bench test")
520
@click.pass_context
521
def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show,
522
          taskdb_bench, message_queue_bench, all_bench):
523
    """
524
    Run Benchmark test.
525
    In bench mode, in-memory sqlite database is used instead of on-disk sqlite database.
526
    """
527
    from pyspider.libs import bench
528
    from pyspider.webui import bench_test  # flake8: noqa
529
530
    ctx.obj['debug'] = False
531
    g = ctx.obj
532
    if result_worker_num == 0:
533
        g['processor2result'] = None
534
535
    if run_in == 'subprocess' and os.name != 'nt':
536
        run_in = utils.run_in_subprocess
537
    else:
538
        run_in = utils.run_in_thread
539
540
    all_test = not taskdb_bench and not message_queue_bench and not all_bench
541
542
    # test taskdb
543
    if all_test or taskdb_bench:
544
        bench.bench_test_taskdb(g.taskdb)
545
    # test message queue
546
    if all_test or message_queue_bench:
547
        bench.bench_test_message_queue(g.scheduler2fetcher)
548
    # test all
549
    if not all_test and not all_bench:
550
        return
551
552
    project_name = '__bench_test__'
553
554
    def clear_project():
555
        g.taskdb.drop(project_name)
556
        g.projectdb.drop(project_name)
557
        g.resultdb.drop(project_name)
558
559
    clear_project()
560
    g.projectdb.insert(project_name, {
561
        'name': project_name,
562
        'status': 'RUNNING',
563
        'script': bench.bench_script % {'total': total, 'show': show},
564
        'rate': total,
565
        'burst': total,
566
        'updatetime': time.time()
567
    })
568
569
    # disable log
570
    logging.getLogger().setLevel(logging.ERROR)
571
    logging.getLogger('scheduler').setLevel(logging.ERROR)
572
    logging.getLogger('fetcher').setLevel(logging.ERROR)
573
    logging.getLogger('processor').setLevel(logging.ERROR)
574
    logging.getLogger('result').setLevel(logging.ERROR)
575
    logging.getLogger('webui').setLevel(logging.ERROR)
576
    logging.getLogger('werkzeug').setLevel(logging.ERROR)
577
578
    try:
579
        threads = []
580
581
        # result worker
582
        result_worker_config = g.config.get('result_worker', {})
583
        for i in range(result_worker_num):
584
            threads.append(run_in(ctx.invoke, result_worker,
585
                                  result_cls='pyspider.libs.bench.BenchResultWorker',
586
                                  **result_worker_config))
587
588
        # processor
589
        processor_config = g.config.get('processor', {})
590
        for i in range(processor_num):
591
            threads.append(run_in(ctx.invoke, processor,
592
                                  processor_cls='pyspider.libs.bench.BenchProcessor',
593
                                  **processor_config))
594
595
        # fetcher
596
        fetcher_config = g.config.get('fetcher', {})
597
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
598
        for i in range(fetcher_num):
599
            threads.append(run_in(ctx.invoke, fetcher,
600
                                  fetcher_cls='pyspider.libs.bench.BenchFetcher',
601
                                  **fetcher_config))
602
603
        # scheduler
604
        scheduler_config = g.config.get('scheduler', {})
605
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
606
        scheduler_config.setdefault('xmlrpc_port', 23333)
607
        threads.append(run_in(ctx.invoke, scheduler,
608
                              scheduler_cls='pyspider.libs.bench.BenchScheduler',
609
                              **scheduler_config))
610
        scheduler_rpc = connect_rpc(ctx, None,
611
                                    'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config)
612
613
        # webui
614
        webui_config = g.config.get('webui', {})
615
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
616
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
617
        threads.append(run_in(ctx.invoke, webui, **webui_config))
618
619
        # wait bench test finished
620
        while True:
621
            time.sleep(1)
622
            if scheduler_rpc.size() == 0:
623
                break
624
    finally:
625
        # exit components run in threading
626
        for each in g.instances:
627
            each.quit()
628
629
        # exit components run in subprocess
630
        for each in threads:
631
            if hasattr(each, 'terminate'):
632
                each.terminate()
633
            each.join(1)
634
635
        clear_project()
636
637
638
@cli.command()
639
@click.option('-i', '--interactive', default=False, is_flag=True,
640
              help='enable interactive mode, you can choose crawl url.')
641
@click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True,
642
              help='enable phantomjs, will spawn a subprocess for phantomjs')
643
@click.argument('scripts', nargs=-1)
644
@click.pass_context
645
def one(ctx, interactive, enable_phantomjs, scripts):
646
    """
647
    One mode not only means all-in-one, it runs every thing in one process over
648
    tornado.ioloop, for debug purpose
649
    """
650
651
    ctx.obj['debug'] = False
652
    g = ctx.obj
653
    g['testing_mode'] = True
654
655
    if scripts:
656
        from pyspider.database.local.projectdb import ProjectDB
657
        g['projectdb'] = ProjectDB(scripts)
658
        if g.get('is_taskdb_default'):
659
            g['taskdb'] = connect_database('sqlite+taskdb://')
660
        if g.get('is_resultdb_default'):
661
            g['resultdb'] = None
662
663
    if enable_phantomjs:
664
        phantomjs_config = g.config.get('phantomjs', {})
665
        phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config)
666
        if phantomjs_obj:
667
            g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port)
668
    else:
669
        phantomjs_obj = None
670
671
    result_worker_config = g.config.get('result_worker', {})
672
    if g.resultdb is None:
673
        result_worker_config.setdefault('result_cls',
674
                                        'pyspider.result.OneResultWorker')
675
    result_worker_obj = ctx.invoke(result_worker, **result_worker_config)
676
677
    processor_config = g.config.get('processor', {})
678
    processor_config.setdefault('enable_stdout_capture', False)
679
    processor_obj = ctx.invoke(processor, **processor_config)
680
681
    fetcher_config = g.config.get('fetcher', {})
682
    fetcher_config.setdefault('xmlrpc', False)
683
    fetcher_obj = ctx.invoke(fetcher, **fetcher_config)
684
685
    scheduler_config = g.config.get('scheduler', {})
686
    scheduler_config.setdefault('xmlrpc', False)
687
    scheduler_config.setdefault('scheduler_cls',
688
                                'pyspider.scheduler.OneScheduler')
689
    scheduler_obj = ctx.invoke(scheduler, **scheduler_config)
690
691
    scheduler_obj.init_one(ioloop=fetcher_obj.ioloop,
692
                           fetcher=fetcher_obj,
693
                           processor=processor_obj,
694
                           result_worker=result_worker_obj,
695
                           interactive=interactive)
696
    if scripts:
697
        for project in g.projectdb.projects:
698
            scheduler_obj.trigger_on_start(project)
699
700
    try:
701
        scheduler_obj.run()
702
    finally:
703
        scheduler_obj.quit()
704
        if phantomjs_obj:
705
            phantomjs_obj.quit()
706
707
708
@cli.command()
709
@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
710
@click.argument('project', nargs=1)
711
@click.argument('message', nargs=1)
712
@click.pass_context
713
def send_message(ctx, scheduler_rpc, project, message):
714
    """
715
    Send Message to project from command line
716
    """
717
    if isinstance(scheduler_rpc, six.string_types):
718
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
719
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
720
        scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
721
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
722
    if scheduler_rpc is None:
723
        scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
724
725
    return scheduler_rpc.send_task({
726
        'taskid': utils.md5string('data:,on_message'),
727
        'project': project,
728
        'url': 'data:,on_message',
729
        'fetch': {
730
            'save': ('__command__', message),
731
        },
732
        'process': {
733
            'callback': '_on_message',
734
        }
735
    })
736
737
738
def main():
739
    cli()
740
741
if __name__ == '__main__':
742
    main()
743