Completed
Push — master ( e6dbce...addc19 )
by Roy
02:40
created

webui()   F

Complexity

Conditions 15

Size

Total Lines 78

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 15
c 1
b 1
f 0
dl 0
loc 78
rs 2.1566

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like webui() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-03-05 00:11:49
7
8
9
import os
10
import sys
11
import six
12
import copy
13
import time
14
import shutil
15
import logging
16
import logging.config
17
18
import click
19
import pyspider
20
from pyspider.message_queue import connect_message_queue
21
from pyspider.database import connect_database
22
from pyspider.libs import utils
23
24
25
def read_config(ctx, param, value):
26
    if not value:
27
        return {}
28
    import json
29
30
    def underline_dict(d):
31
        if not isinstance(d, dict):
32
            return d
33
        return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
34
35
    config = underline_dict(json.load(value))
36
    ctx.default_map = config
37
    return config
38
39
40
def connect_db(ctx, param, value):
41
    if not value:
42
        return
43
    return utils.Get(lambda: connect_database(value))
44
45
46
def load_cls(ctx, param, value):
47
    if isinstance(value, six.string_types):
48
        return utils.load_object(value)
49
    return value
50
51
52
def connect_rpc(ctx, param, value):
53
    if not value:
54
        return
55
    try:
56
        from six.moves import xmlrpc_client
57
    except ImportError:
58
        import xmlrpclib as xmlrpc_client
59
    return xmlrpc_client.ServerProxy(value, allow_none=True)
60
61
62
@click.group(invoke_without_command=True)
63
@click.option('-c', '--config', callback=read_config, type=click.File('r'),
64
              help='a json file with default values for subcommands. {"webui": {"port":5001}}')
65
@click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
66
              help="logging config file for built-in python logging module", show_default=True)
67
@click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
68
@click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
69
              help='maxsize of queue')
70
@click.option('--taskdb', envvar='TASKDB', callback=connect_db,
71
              help='database url for taskdb, default: sqlite')
72
@click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
73
              help='database url for projectdb, default: sqlite')
74
@click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
75
              help='database url for resultdb, default: sqlite')
76
@click.option('--message-queue', envvar='AMQP_URL',
77
              help='connection url to message queue, '
78
              'default: builtin multiprocessing.Queue')
79
@click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
80
              'please use --message-queue instead.')
81
@click.option('--beanstalk', envvar='BEANSTALK_HOST',
82
              help='[deprecated] beanstalk config for beanstalk queue. '
83
              'please use --message-queue instead.')
84
@click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
85
@click.option('--data-path', default='./data', help='data dir path')
86
@click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
87
              help='add current working directory to python lib search path')
88
@click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
89
@click.pass_context
90
def cli(ctx, **kwargs):
91
    """
92
    A powerful spider system in python.
93
    """
94
    if kwargs['add_sys_path']:
95
        sys.path.append(os.getcwd())
96
97
    logging.config.fileConfig(kwargs['logging_config'])
98
99
    # get db from env
100
    for db in ('taskdb', 'projectdb', 'resultdb'):
101
        if kwargs[db] is not None:
102
            continue
103
        if os.environ.get('MYSQL_NAME'):
104
            kwargs[db] = utils.Get(lambda db=db: connect_database(
105
                'sqlalchemy+mysql+%s://%s:%s/%s' % (
106
                    db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
107
                    os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
108
        elif os.environ.get('MONGODB_NAME'):
109
            kwargs[db] = utils.Get(lambda db=db: connect_database(
110
                'mongodb+%s://%s:%s/%s' % (
111
                    db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
112
                    os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
113
        elif ctx.invoked_subcommand == 'bench':
114
            if kwargs['data_path'] == './data':
115
                kwargs['data_path'] += '/bench'
116
                shutil.rmtree(kwargs['data_path'], ignore_errors=True)
117
                os.mkdir(kwargs['data_path'])
118
            if db in ('taskdb', 'resultdb'):
119
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
120
            else:
121
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
122
                    db, kwargs['data_path'], db[:-2])))
123
        else:
124
            if not os.path.exists(kwargs['data_path']):
125
                os.mkdir(kwargs['data_path'])
126
            kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
127
                db, kwargs['data_path'], db[:-2])))
128
            kwargs['is_%s_default' % db] = True
129
130
    # create folder for counter.dump
131
    if not os.path.exists(kwargs['data_path']):
132
        os.mkdir(kwargs['data_path'])
133
134
    # message queue, compatible with old version
135
    if kwargs.get('message_queue'):
136
        pass
137
    elif kwargs.get('amqp_url'):
138
        kwargs['message_queue'] = kwargs['amqp_url']
139
    elif os.environ.get('RABBITMQ_NAME'):
140
        kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
141
                                   ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
142
    elif kwargs.get('beanstalk'):
143
        kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
144
145
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
146
                 'fetcher2processor', 'processor2result'):
147
        if kwargs.get('message_queue'):
148
            kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
149
                name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
150
        else:
151
            kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
152
                                                 kwargs['queue_maxsize'])
153
154
    # phantomjs-proxy
155
    if kwargs.get('phantomjs_proxy'):
156
        pass
157
    elif os.environ.get('PHANTOMJS_NAME'):
158
        kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
159
160
    ctx.obj = utils.ObjectDict(ctx.obj or {})
161
    ctx.obj['instances'] = []
162
    ctx.obj.update(kwargs)
163
164
    if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
165
        ctx.invoke(all)
166
    return ctx
167
168
169
@cli.command()
170
@click.option('--xmlrpc/--no-xmlrpc', default=True)
171
@click.option('--xmlrpc-host', default='0.0.0.0')
172
@click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
173
@click.option('--inqueue-limit', default=0,
174
              help='size limit of task queue for each project, '
175
              'tasks will been ignored when overflow')
176
@click.option('--delete-time', default=24 * 60 * 60,
177
              help='delete time before marked as delete')
178
@click.option('--active-tasks', default=100, help='active log size')
179
@click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
180
@click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
181
              help='scheduler class to be used.')
182
@click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
183
@click.pass_context
184
def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port,
185
              inqueue_limit, delete_time, active_tasks, loop_limit, scheduler_cls,
186
              threads, get_object=False):
187
    """
188
    Run Scheduler, only one scheduler is allowed.
189
    """
190
    g = ctx.obj
191
    Scheduler = load_cls(None, None, scheduler_cls)
192
193
    kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
194
                  newtask_queue=g.newtask_queue, status_queue=g.status_queue,
195
                  out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data'))
196
    if threads:
197
        kwargs['threads'] = int(threads)
198
199
    scheduler = Scheduler(**kwargs)
200
    scheduler.INQUEUE_LIMIT = inqueue_limit
201
    scheduler.DELETE_TIME = delete_time
202
    scheduler.ACTIVE_TASKS = active_tasks
203
    scheduler.LOOP_LIMIT = loop_limit
204
205
    g.instances.append(scheduler)
206
    if g.get('testing_mode') or get_object:
207
        return scheduler
208
209
    if xmlrpc:
210
        utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
211
    scheduler.run()
212
213
214
@cli.command()
215
@click.option('--xmlrpc/--no-xmlrpc', default=False)
216
@click.option('--xmlrpc-host', default='0.0.0.0')
217
@click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444)
218
@click.option('--poolsize', default=100, help="max simultaneous fetches")
219
@click.option('--proxy', help="proxy host:port")
220
@click.option('--user-agent', help='user agent')
221
@click.option('--timeout', help='default fetch timeout')
222
@click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls,
223
              help='Fetcher class to be used.')
224
@click.pass_context
225
def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent,
226
            timeout, fetcher_cls, async=True, get_object=False, g=None):
227
    """
228
    Run Fetcher.
229
    """
230
    g = g or ctx.obj
231
    Fetcher = load_cls(None, None, fetcher_cls)
232
233
    fetcher = Fetcher(inqueue=g.scheduler2fetcher, outqueue=g.fetcher2processor,
234
                      poolsize=poolsize, proxy=proxy, async=async)
235
    fetcher.phantomjs_proxy = g.phantomjs_proxy
236
    if user_agent:
237
        fetcher.user_agent = user_agent
238
    if timeout:
239
        fetcher.default_options = copy.deepcopy(fetcher.default_options)
240
        fetcher.default_options['timeout'] = timeout
241
242
    g.instances.append(fetcher)
243
    if g.get('testing_mode') or get_object:
244
        return fetcher
245
246
    if xmlrpc:
247
        utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
248
    fetcher.run()
249
250
251
@cli.command()
252
@click.option('--processor-cls', default='pyspider.processor.Processor',
253
              callback=load_cls, help='Processor class to be used.')
254
@click.pass_context
255
def processor(ctx, processor_cls, enable_stdout_capture=True, get_object=False):
256
    """
257
    Run Processor.
258
    """
259
    g = ctx.obj
260
    Processor = load_cls(None, None, processor_cls)
261
262
    processor = Processor(projectdb=g.projectdb,
263
                          inqueue=g.fetcher2processor, status_queue=g.status_queue,
264
                          newtask_queue=g.newtask_queue, result_queue=g.processor2result,
265
                          enable_stdout_capture=enable_stdout_capture)
266
267
    g.instances.append(processor)
268
    if g.get('testing_mode') or get_object:
269
        return processor
270
271
    processor.run()
272
273
274
@cli.command()
275
@click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
276
              help='ResultWorker class to be used.')
277
@click.pass_context
278
def result_worker(ctx, result_cls, get_object=False):
279
    """
280
    Run result worker.
281
    """
282
    g = ctx.obj
283
    ResultWorker = load_cls(None, None, result_cls)
284
285
    result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
286
287
    g.instances.append(result_worker)
288
    if g.get('testing_mode') or get_object:
289
        return result_worker
290
291
    result_worker.run()
292
293
294
@cli.command()
295
@click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST',
296
              help='webui bind to host')
297
@click.option('--port', default=5000, envvar='WEBUI_PORT',
298
              help='webui bind to host')
299
@click.option('--cdn', default='//cdnjscn.b0.upaiyun.com/libs/',
300
              help='js/css cdn server')
301
@click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
302
@click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
303
@click.option('--max-rate', type=float, help='max rate for each project')
304
@click.option('--max-burst', type=float, help='max burst for each project')
305
@click.option('--username', envvar='WEBUI_USERNAME',
306
              help='username of lock -ed projects')
307
@click.option('--password', envvar='WEBUI_PASSWORD',
308
              help='password of lock -ed projects')
309
@click.option('--need-auth', is_flag=True, default=False, help='need username and password')
310
@click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls,
311
              help='webui Flask Application instance to be used.')
312
@click.pass_context
313
def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst,
314
          username, password, need_auth, webui_instance, get_object=False):
315
    """
316
    Run WebUI
317
    """
318
    app = load_cls(None, None, webui_instance)
319
320
    g = ctx.obj
321
    app.config['taskdb'] = g.taskdb
322
    app.config['projectdb'] = g.projectdb
323
    app.config['resultdb'] = g.resultdb
324
    app.config['cdn'] = cdn
325
326
    if max_rate:
327
        app.config['max_rate'] = max_rate
328
    if max_burst:
329
        app.config['max_burst'] = max_burst
330
    if username:
331
        app.config['webui_username'] = username
332
    if password:
333
        app.config['webui_password'] = password
334
    app.config['need_auth'] = need_auth
335
336
    # inject queues for webui
337
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
338
                 'fetcher2processor', 'processor2result'):
339
        app.config['queues'][name] = getattr(g, name, None)
340
341
    # fetcher rpc
342
    if isinstance(fetcher_rpc, six.string_types):
343
        import umsgpack
344
        fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc)
345
        app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data)
346
    else:
347
        # get fetcher instance for webui
348
        fetcher_config = g.config.get('fetcher', {})
349
        mock_g = copy.deepcopy(g)
350
        mock_g['scheduler2fetcher'] = None
351
        mock_g['fetcher2processor'] = None
352
        webui_fetcher = ctx.invoke(fetcher, async=False, get_object=True, g=mock_g, **fetcher_config)
353
354
        app.config['fetch'] = lambda x: webui_fetcher.fetch(x)
355
356
    if isinstance(scheduler_rpc, six.string_types):
357
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
358
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
359
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % (
360
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
361
    elif scheduler_rpc is None:
362
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
363
    else:
364
        app.config['scheduler_rpc'] = scheduler_rpc
365
366
    app.debug = g.debug
367
    g.instances.append(app)
368
    if g.get('testing_mode') or get_object:
369
        return app
370
371
    app.run(host=host, port=port)
372
373
374
@cli.command()
375
@click.option('--phantomjs-path', default='phantomjs', help='phantomjs path')
376
@click.option('--port', default=25555, help='phantomjs port')
377
@click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed')
378
@click.argument('args', nargs=-1)
379
@click.pass_context
380
def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
381
    """
382
    Run phantomjs fetcher if phantomjs is installed.
383
    """
384
    args = args or ctx.default_map and ctx.default_map.get('args', [])
385
386
    import subprocess
387
    g = ctx.obj
388
    _quit = []
389
    phantomjs_fetcher = os.path.join(
390
        os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js')
391
    cmd = [phantomjs_path,
392
           # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
393
           #'--load-images=false',
394
           '--ssl-protocol=any',
395
           '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]
396
397
    try:
398
        _phantomjs = subprocess.Popen(cmd)
399
    except OSError:
400
        logging.warning('phantomjs not found, continue running without it.')
401
        return None
402
403
    def quit(*args, **kwargs):
404
        _quit.append(1)
405
        _phantomjs.kill()
406
        _phantomjs.wait()
407
        logging.info('phantomjs exited.')
408
409
    if not g.get('phantomjs_proxy'):
410
        g['phantomjs_proxy'] = '127.0.0.1:%s' % port
411
412
    phantomjs = utils.ObjectDict(port=port, quit=quit)
413
    g.instances.append(phantomjs)
414
    if g.get('testing_mode'):
415
        return phantomjs
416
417
    while True:
418
        _phantomjs.wait()
419
        if _quit or not auto_restart:
420
            break
421
        _phantomjs = subprocess.Popen(cmd)
422
423
424
@cli.command()
425
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
426
@click.option('--processor-num', default=1, help='instance num of processor')
427
@click.option('--result-worker-num', default=1,
428
              help='instance num of result worker')
429
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
430
              help='run each components in thread or subprocess. '
431
              'always using thread for windows.')
432
@click.pass_context
433
def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
434
    """
435
    Run all the components in subprocess or thread
436
    """
437
438
    ctx.obj['debug'] = False
439
    g = ctx.obj
440
441
    # FIXME: py34 cannot run components with threads
442
    if run_in == 'subprocess' and os.name != 'nt':
443
        run_in = utils.run_in_subprocess
444
    else:
445
        run_in = utils.run_in_thread
446
447
    threads = []
448
449
    try:
450
        # phantomjs
451
        if not g.get('phantomjs_proxy'):
452
            phantomjs_config = g.config.get('phantomjs', {})
453
            phantomjs_config.setdefault('auto_restart', True)
454
            threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
455
            time.sleep(2)
456
            if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
457
                g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
458
459
        # result worker
460
        result_worker_config = g.config.get('result_worker', {})
461
        for i in range(result_worker_num):
462
            threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))
463
464
        # processor
465
        processor_config = g.config.get('processor', {})
466
        for i in range(processor_num):
467
            threads.append(run_in(ctx.invoke, processor, **processor_config))
468
469
        # fetcher
470
        fetcher_config = g.config.get('fetcher', {})
471
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
472
        for i in range(fetcher_num):
473
            threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
474
475
        # scheduler
476
        scheduler_config = g.config.get('scheduler', {})
477
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
478
        threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))
479
480
        # running webui in main thread to make it exitable
481
        webui_config = g.config.get('webui', {})
482
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
483
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
484
        ctx.invoke(webui, **webui_config)
485
    finally:
486
        # exit components run in threading
487
        for each in g.instances:
488
            each.quit()
489
490
        # exit components run in subprocess
491
        for each in threads:
492
            if not each.is_alive():
493
                continue
494
            if hasattr(each, 'terminate'):
495
                each.terminate()
496
            each.join()
497
498
499
@cli.command()
500
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
501
@click.option('--processor-num', default=2, help='instance num of processor')
502
@click.option('--result-worker-num', default=1, help='instance num of result worker')
503
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
504
              help='run each components in thread or subprocess. '
505
              'always using thread for windows.')
506
@click.option('--total', default=10000, help="total url in test page")
507
@click.option('--show', default=20, help="show how many urls in a page")
508
@click.option('--taskdb-bench', default=False, is_flag=True,
509
              help="only run taskdb bench test")
510
@click.option('--message-queue-bench', default=False, is_flag=True,
511
              help="only run message queue bench test")
512
@click.option('--all-bench', default=False, is_flag=True,
513
              help="only run all bench test")
514
@click.pass_context
515
def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show,
516
          taskdb_bench, message_queue_bench, all_bench):
517
    """
518
    Run Benchmark test.
519
    In bench mode, in-memory sqlite database is used instead of on-disk sqlite database.
520
    """
521
    from pyspider.libs import bench
522
    from pyspider.webui import bench_test  # flake8: noqa
523
524
    ctx.obj['debug'] = False
525
    g = ctx.obj
526
    if result_worker_num == 0:
527
        g['processor2result'] = None
528
529
    if run_in == 'subprocess' and os.name != 'nt':
530
        run_in = utils.run_in_subprocess
531
    else:
532
        run_in = utils.run_in_thread
533
534
    all_test = not taskdb_bench and not message_queue_bench and not all_bench
535
536
    # test taskdb
537
    if all_test or taskdb_bench:
538
        bench.bench_test_taskdb(g.taskdb)
539
    # test message queue
540
    if all_test or message_queue_bench:
541
        bench.bench_test_message_queue(g.scheduler2fetcher)
542
    # test all
543
    if not all_test and not all_bench:
544
        return
545
546
    project_name = '__bench_test__'
547
548
    def clear_project():
549
        g.taskdb.drop(project_name)
550
        g.projectdb.drop(project_name)
551
        g.resultdb.drop(project_name)
552
553
    clear_project()
554
    g.projectdb.insert(project_name, {
555
        'name': project_name,
556
        'status': 'RUNNING',
557
        'script': bench.bench_script % {'total': total, 'show': show},
558
        'rate': total,
559
        'burst': total,
560
        'updatetime': time.time()
561
    })
562
563
    # disable log
564
    logging.getLogger().setLevel(logging.ERROR)
565
    logging.getLogger('scheduler').setLevel(logging.ERROR)
566
    logging.getLogger('fetcher').setLevel(logging.ERROR)
567
    logging.getLogger('processor').setLevel(logging.ERROR)
568
    logging.getLogger('result').setLevel(logging.ERROR)
569
    logging.getLogger('webui').setLevel(logging.ERROR)
570
    logging.getLogger('werkzeug').setLevel(logging.ERROR)
571
572
    try:
573
        threads = []
574
575
        # result worker
576
        result_worker_config = g.config.get('result_worker', {})
577
        for i in range(result_worker_num):
578
            threads.append(run_in(ctx.invoke, result_worker,
579
                                  result_cls='pyspider.libs.bench.BenchResultWorker',
580
                                  **result_worker_config))
581
582
        # processor
583
        processor_config = g.config.get('processor', {})
584
        for i in range(processor_num):
585
            threads.append(run_in(ctx.invoke, processor,
586
                                  processor_cls='pyspider.libs.bench.BenchProcessor',
587
                                  **processor_config))
588
589
        # fetcher
590
        fetcher_config = g.config.get('fetcher', {})
591
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
592
        for i in range(fetcher_num):
593
            threads.append(run_in(ctx.invoke, fetcher,
594
                                  fetcher_cls='pyspider.libs.bench.BenchFetcher',
595
                                  **fetcher_config))
596
597
        # webui
598
        webui_config = g.config.get('webui', {})
599
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
600
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
601
        threads.append(run_in(ctx.invoke, webui, **webui_config))
602
603
        time.sleep(1)
604
605
        # scheduler
606
        scheduler_config = g.config.get('scheduler', {})
607
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
608
        scheduler_config.setdefault('xmlrpc_port', 23333)
609
        threads.append(run_in(ctx.invoke, scheduler,
610
                              scheduler_cls='pyspider.libs.bench.BenchScheduler',
611
                              **scheduler_config))
612
        scheduler_rpc = connect_rpc(ctx, None,
613
                                    'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config)
614
615
        time.sleep(2)
616
617
        scheduler_rpc.newtask({
618
            "project": project_name,
619
            "taskid": "on_start",
620
            "url": "data:,on_start",
621
            "process": {
622
                "callback": "on_start",
623
            },
624
        })
625
626
        # wait bench test finished
627
        while True:
628
            time.sleep(1)
629
            if scheduler_rpc.size() == 0:
630
                break
631
    finally:
632
        # exit components run in threading
633
        for each in g.instances:
634
            each.quit()
635
636
        # exit components run in subprocess
637
        for each in threads:
638
            if hasattr(each, 'terminate'):
639
                each.terminate()
640
            each.join(1)
641
642
        clear_project()
643
644
645
@cli.command()
646
@click.option('-i', '--interactive', default=False, is_flag=True,
647
              help='enable interactive mode, you can choose crawl url.')
648
@click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True,
649
              help='enable phantomjs, will spawn a subprocess for phantomjs')
650
@click.argument('scripts', nargs=-1)
651
@click.pass_context
652
def one(ctx, interactive, enable_phantomjs, scripts):
653
    """
654
    One mode not only means all-in-one, it runs every thing in one process over
655
    tornado.ioloop, for debug purpose
656
    """
657
658
    ctx.obj['debug'] = False
659
    g = ctx.obj
660
    g['testing_mode'] = True
661
662
    if scripts:
663
        from pyspider.database.local.projectdb import ProjectDB
664
        g['projectdb'] = ProjectDB(scripts)
665
        if g.get('is_taskdb_default'):
666
            g['taskdb'] = connect_database('sqlite+taskdb://')
667
        if g.get('is_resultdb_default'):
668
            g['resultdb'] = None
669
670
    if enable_phantomjs:
671
        phantomjs_config = g.config.get('phantomjs', {})
672
        phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config)
673
        if phantomjs_obj:
674
            g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port)
675
    else:
676
        phantomjs_obj = None
677
678
    result_worker_config = g.config.get('result_worker', {})
679
    if g.resultdb is None:
680
        result_worker_config.setdefault('result_cls',
681
                                        'pyspider.result.OneResultWorker')
682
    result_worker_obj = ctx.invoke(result_worker, **result_worker_config)
683
684
    processor_config = g.config.get('processor', {})
685
    processor_config.setdefault('enable_stdout_capture', False)
686
    processor_obj = ctx.invoke(processor, **processor_config)
687
688
    fetcher_config = g.config.get('fetcher', {})
689
    fetcher_config.setdefault('xmlrpc', False)
690
    fetcher_obj = ctx.invoke(fetcher, **fetcher_config)
691
692
    scheduler_config = g.config.get('scheduler', {})
693
    scheduler_config.setdefault('xmlrpc', False)
694
    scheduler_config.setdefault('scheduler_cls',
695
                                'pyspider.scheduler.OneScheduler')
696
    scheduler_obj = ctx.invoke(scheduler, **scheduler_config)
697
698
    scheduler_obj.init_one(ioloop=fetcher_obj.ioloop,
699
                           fetcher=fetcher_obj,
700
                           processor=processor_obj,
701
                           result_worker=result_worker_obj,
702
                           interactive=interactive)
703
    if scripts:
704
        for project in g.projectdb.projects:
705
            scheduler_obj.trigger_on_start(project)
706
707
    try:
708
        scheduler_obj.run()
709
    finally:
710
        scheduler_obj.quit()
711
        if phantomjs_obj:
712
            phantomjs_obj.quit()
713
714
715
@cli.command()
716
@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
717
@click.argument('project', nargs=1)
718
@click.argument('message', nargs=1)
719
@click.pass_context
720
def send_message(ctx, scheduler_rpc, project, message):
721
    """
722
    Send Message to project from command line
723
    """
724
    if isinstance(scheduler_rpc, six.string_types):
725
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
726
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
727
        scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
728
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
729
    if scheduler_rpc is None:
730
        scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
731
732
    return scheduler_rpc.send_task({
733
        'taskid': utils.md5string('data:,on_message'),
734
        'project': project,
735
        'url': 'data:,on_message',
736
        'fetch': {
737
            'save': ('__command__', message),
738
        },
739
        'process': {
740
            'callback': '_on_message',
741
        }
742
    })
743
744
745
def main():
746
    cli()
747
748
if __name__ == '__main__':
749
    main()
750