Completed
Push — master ( ac2460...7c57cb )
by Roy
12:37 queued 01:23
created

fetcher()   C

Complexity

Conditions 7

Size

Total Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 7
dl 0
loc 41
rs 5.5
c 2
b 1
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-03-05 00:11:49
7
8
9
import os
10
import sys
11
import six
12
import copy
13
import time
14
import shutil
15
import logging
16
import logging.config
17
18
import click
19
import pyspider
20
from pyspider.message_queue import connect_message_queue
21
from pyspider.database import connect_database
22
from pyspider.libs import utils
23
24
25
def read_config(ctx, param, value):
26
    if not value:
27
        return {}
28
    import json
29
30
    def underline_dict(d):
31
        if not isinstance(d, dict):
32
            return d
33
        return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
34
35
    config = underline_dict(json.load(value))
36
    ctx.default_map = config
37
    return config
38
39
40
def connect_db(ctx, param, value):
41
    if not value:
42
        return
43
    return utils.Get(lambda: connect_database(value))
44
45
46
def load_cls(ctx, param, value):
47
    if isinstance(value, six.string_types):
48
        return utils.load_object(value)
49
    return value
50
51
52
def connect_rpc(ctx, param, value):
53
    if not value:
54
        return
55
    try:
56
        from six.moves import xmlrpc_client
57
    except ImportError:
58
        import xmlrpclib as xmlrpc_client
59
    return xmlrpc_client.ServerProxy(value, allow_none=True)
60
61
62
@click.group(invoke_without_command=True)
63
@click.option('-c', '--config', callback=read_config, type=click.File('r'),
64
              help='a json file with default values for subcommands. {"webui": {"port":5001}}')
65
@click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
66
              help="logging config file for built-in python logging module", show_default=True)
67
@click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
68
@click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
69
              help='maxsize of queue')
70
@click.option('--taskdb', envvar='TASKDB', callback=connect_db,
71
              help='database url for taskdb, default: sqlite')
72
@click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
73
              help='database url for projectdb, default: sqlite')
74
@click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
75
              help='database url for resultdb, default: sqlite')
76
@click.option('--message-queue', envvar='AMQP_URL',
77
              help='connection url to message queue, '
78
              'default: builtin multiprocessing.Queue')
79
@click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
80
              'please use --message-queue instead.')
81
@click.option('--beanstalk', envvar='BEANSTALK_HOST',
82
              help='[deprecated] beanstalk config for beanstalk queue. '
83
              'please use --message-queue instead.')
84
@click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
85
@click.option('--data-path', default='./data', help='data dir path')
86
@click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
87
              help='add current working directory to python lib search path')
88
@click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
89
@click.pass_context
90
def cli(ctx, **kwargs):
91
    """
92
    A powerful spider system in python.
93
    """
94
    if kwargs['add_sys_path']:
95
        sys.path.append(os.getcwd())
96
97
    logging.config.fileConfig(kwargs['logging_config'])
98
99
    # get db from env
100
    for db in ('taskdb', 'projectdb', 'resultdb'):
101
        if kwargs[db] is not None:
102
            continue
103
        if os.environ.get('MYSQL_NAME'):
104
            kwargs[db] = utils.Get(lambda db=db: connect_database(
105
                'sqlalchemy+mysql+%s://%s:%s/%s' % (
106
                    db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
107
                    os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
108
        elif os.environ.get('MONGODB_NAME'):
109
            kwargs[db] = utils.Get(lambda db=db: connect_database(
110
                'mongodb+%s://%s:%s/%s' % (
111
                    db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
112
                    os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
113
        elif ctx.invoked_subcommand == 'bench':
114
            if kwargs['data_path'] == './data':
115
                kwargs['data_path'] += '/bench'
116
                shutil.rmtree(kwargs['data_path'], ignore_errors=True)
117
                os.mkdir(kwargs['data_path'])
118
            if db in ('taskdb', 'resultdb'):
119
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
120
            else:
121
                kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
122
                    db, kwargs['data_path'], db[:-2])))
123
        else:
124
            if not os.path.exists(kwargs['data_path']):
125
                os.mkdir(kwargs['data_path'])
126
            kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
127
                db, kwargs['data_path'], db[:-2])))
128
            kwargs['is_%s_default' % db] = True
129
130
    # create folder for counter.dump
131
    if not os.path.exists(kwargs['data_path']):
132
        os.mkdir(kwargs['data_path'])
133
134
    # message queue, compatible with old version
135
    if kwargs.get('message_queue'):
136
        pass
137
    elif kwargs.get('amqp_url'):
138
        kwargs['message_queue'] = kwargs['amqp_url']
139
    elif os.environ.get('RABBITMQ_NAME'):
140
        kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
141
                                   ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
142
    elif kwargs.get('beanstalk'):
143
        kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
144
145
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
146
                 'fetcher2processor', 'processor2result'):
147
        if kwargs.get('message_queue'):
148
            kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
149
                name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
150
        else:
151
            kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
152
                                                 kwargs['queue_maxsize'])
153
154
    # phantomjs-proxy
155
    if kwargs.get('phantomjs_proxy'):
156
        pass
157
    elif os.environ.get('PHANTOMJS_NAME'):
158
        kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
159
160
    ctx.obj = utils.ObjectDict(ctx.obj or {})
161
    ctx.obj['instances'] = []
162
    ctx.obj.update(kwargs)
163
164
    if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
165
        ctx.invoke(all)
166
    return ctx
167
168
169
@cli.command()
170
@click.option('--xmlrpc/--no-xmlrpc', default=True)
171
@click.option('--xmlrpc-host', default='0.0.0.0')
172
@click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
173
@click.option('--inqueue-limit', default=0,
174
              help='size limit of task queue for each project, '
175
              'tasks will been ignored when overflow')
176
@click.option('--delete-time', default=24 * 60 * 60,
177
              help='delete time before marked as delete')
178
@click.option('--active-tasks', default=100, help='active log size')
179
@click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
180
@click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
181
              help='scheduler class to be used.')
182
@click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
183
@click.pass_context
184
def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port,
185
              inqueue_limit, delete_time, active_tasks, loop_limit, scheduler_cls,
186
              threads, get_object=False):
187
    """
188
    Run Scheduler, only one scheduler is allowed.
189
    """
190
    g = ctx.obj
191
    Scheduler = load_cls(None, None, scheduler_cls)
192
193
    kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
194
                  newtask_queue=g.newtask_queue, status_queue=g.status_queue,
195
                  out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data'))
196
    if threads:
197
        kwargs['threads'] = int(threads)
198
199
    scheduler = Scheduler(**kwargs)
200
    scheduler.INQUEUE_LIMIT = inqueue_limit
201
    scheduler.DELETE_TIME = delete_time
202
    scheduler.ACTIVE_TASKS = active_tasks
203
    scheduler.LOOP_LIMIT = loop_limit
204
205
    g.instances.append(scheduler)
206
    if g.get('testing_mode') or get_object:
207
        return scheduler
208
209
    if xmlrpc:
210
        utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
211
    scheduler.run()
212
213
214
@cli.command()
215
@click.option('--xmlrpc/--no-xmlrpc', default=False)
216
@click.option('--xmlrpc-host', default='0.0.0.0')
217
@click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444)
218
@click.option('--poolsize', default=100, help="max simultaneous fetches")
219
@click.option('--proxy', help="proxy host:port")
220
@click.option('--user-agent', help='user agent')
221
@click.option('--timeout', help='default fetch timeout')
222
@click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls,
223
              help='Fetcher class to be used.')
224
@click.pass_context
225
def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent,
226
            timeout, fetcher_cls, async=True, get_object=False, no_input=False):
227
    """
228
    Run Fetcher.
229
    """
230
    g = ctx.obj
231
    Fetcher = load_cls(None, None, fetcher_cls)
232
233
    if no_input:
234
        inqueue = None
235
        outqueue = None
236
    else:
237
        inqueue = g.scheduler2fetcher
238
        outqueue = g.fetcher2processor
239
    fetcher = Fetcher(inqueue=inqueue, outqueue=outqueue,
240
                      poolsize=poolsize, proxy=proxy, async=async)
241
    fetcher.phantomjs_proxy = g.phantomjs_proxy
242
    if user_agent:
243
        fetcher.user_agent = user_agent
244
    if timeout:
245
        fetcher.default_options = copy.deepcopy(fetcher.default_options)
246
        fetcher.default_options['timeout'] = timeout
247
248
    g.instances.append(fetcher)
249
    if g.get('testing_mode') or get_object:
250
        return fetcher
251
252
    if xmlrpc:
253
        utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
254
    fetcher.run()
255
256
257
@cli.command()
258
@click.option('--processor-cls', default='pyspider.processor.Processor',
259
              callback=load_cls, help='Processor class to be used.')
260
@click.pass_context
261
def processor(ctx, processor_cls, enable_stdout_capture=True, get_object=False):
262
    """
263
    Run Processor.
264
    """
265
    g = ctx.obj
266
    Processor = load_cls(None, None, processor_cls)
267
268
    processor = Processor(projectdb=g.projectdb,
269
                          inqueue=g.fetcher2processor, status_queue=g.status_queue,
270
                          newtask_queue=g.newtask_queue, result_queue=g.processor2result,
271
                          enable_stdout_capture=enable_stdout_capture)
272
273
    g.instances.append(processor)
274
    if g.get('testing_mode') or get_object:
275
        return processor
276
277
    processor.run()
278
279
280
@cli.command()
281
@click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
282
              help='ResultWorker class to be used.')
283
@click.pass_context
284
def result_worker(ctx, result_cls, get_object=False):
285
    """
286
    Run result worker.
287
    """
288
    g = ctx.obj
289
    ResultWorker = load_cls(None, None, result_cls)
290
291
    result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
292
293
    g.instances.append(result_worker)
294
    if g.get('testing_mode') or get_object:
295
        return result_worker
296
297
    result_worker.run()
298
299
300
@cli.command()
301
@click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST',
302
              help='webui bind to host')
303
@click.option('--port', default=5000, envvar='WEBUI_PORT',
304
              help='webui bind to host')
305
@click.option('--cdn', default='//cdnjscn.b0.upaiyun.com/libs/',
306
              help='js/css cdn server')
307
@click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
308
@click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
309
@click.option('--max-rate', type=float, help='max rate for each project')
310
@click.option('--max-burst', type=float, help='max burst for each project')
311
@click.option('--username', envvar='WEBUI_USERNAME',
312
              help='username of lock -ed projects')
313
@click.option('--password', envvar='WEBUI_PASSWORD',
314
              help='password of lock -ed projects')
315
@click.option('--need-auth', is_flag=True, default=False, help='need username and password')
316
@click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls,
317
              help='webui Flask Application instance to be used.')
318
@click.pass_context
319
def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst,
320
          username, password, need_auth, webui_instance, get_object=False):
321
    """
322
    Run WebUI
323
    """
324
    app = load_cls(None, None, webui_instance)
325
326
    g = ctx.obj
327
    app.config['taskdb'] = g.taskdb
328
    app.config['projectdb'] = g.projectdb
329
    app.config['resultdb'] = g.resultdb
330
    app.config['cdn'] = cdn
331
332
    if max_rate:
333
        app.config['max_rate'] = max_rate
334
    if max_burst:
335
        app.config['max_burst'] = max_burst
336
    if username:
337
        app.config['webui_username'] = username
338
    if password:
339
        app.config['webui_password'] = password
340
    app.config['need_auth'] = need_auth
341
342
    # inject queues for webui
343
    for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
344
                 'fetcher2processor', 'processor2result'):
345
        app.config['queues'][name] = getattr(g, name, None)
346
347
    # fetcher rpc
348
    if isinstance(fetcher_rpc, six.string_types):
349
        import umsgpack
350
        fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc)
351
        app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data)
352
    else:
353
        # get fetcher instance for webui
354
        fetcher_config = g.config.get('fetcher', {})
355
        webui_fetcher = ctx.invoke(fetcher, async=False, get_object=True, no_input=True, **fetcher_config)
356
357
        app.config['fetch'] = lambda x: webui_fetcher.fetch(x)
358
359
    if isinstance(scheduler_rpc, six.string_types):
360
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
361
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
362
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % (
363
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
364
    elif scheduler_rpc is None:
365
        app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
366
    else:
367
        app.config['scheduler_rpc'] = scheduler_rpc
368
369
    app.debug = g.debug
370
    g.instances.append(app)
371
    if g.get('testing_mode') or get_object:
372
        return app
373
374
    app.run(host=host, port=port)
375
376
377
@cli.command()
378
@click.option('--phantomjs-path', default='phantomjs', help='phantomjs path')
379
@click.option('--port', default=25555, help='phantomjs port')
380
@click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed')
381
@click.argument('args', nargs=-1)
382
@click.pass_context
383
def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
384
    """
385
    Run phantomjs fetcher if phantomjs is installed.
386
    """
387
    args = args or ctx.default_map and ctx.default_map.get('args', [])
388
389
    import subprocess
390
    g = ctx.obj
391
    _quit = []
392
    phantomjs_fetcher = os.path.join(
393
        os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js')
394
    cmd = [phantomjs_path,
395
           # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
396
           #'--load-images=false',
397
           '--ssl-protocol=any',
398
           '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]
399
400
    try:
401
        _phantomjs = subprocess.Popen(cmd)
402
    except OSError:
403
        logging.warning('phantomjs not found, continue running without it.')
404
        return None
405
406
    def quit(*args, **kwargs):
407
        _quit.append(1)
408
        _phantomjs.kill()
409
        _phantomjs.wait()
410
        logging.info('phantomjs exited.')
411
412
    if not g.get('phantomjs_proxy'):
413
        g['phantomjs_proxy'] = '127.0.0.1:%s' % port
414
415
    phantomjs = utils.ObjectDict(port=port, quit=quit)
416
    g.instances.append(phantomjs)
417
    if g.get('testing_mode'):
418
        return phantomjs
419
420
    while True:
421
        _phantomjs.wait()
422
        if _quit or not auto_restart:
423
            break
424
        _phantomjs = subprocess.Popen(cmd)
425
426
427
@cli.command()
428
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
429
@click.option('--processor-num', default=1, help='instance num of processor')
430
@click.option('--result-worker-num', default=1,
431
              help='instance num of result worker')
432
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
433
              help='run each components in thread or subprocess. '
434
              'always using thread for windows.')
435
@click.pass_context
436
def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
437
    """
438
    Run all the components in subprocess or thread
439
    """
440
441
    ctx.obj['debug'] = False
442
    g = ctx.obj
443
444
    # FIXME: py34 cannot run components with threads
445
    if run_in == 'subprocess' and os.name != 'nt':
446
        run_in = utils.run_in_subprocess
447
    else:
448
        run_in = utils.run_in_thread
449
450
    threads = []
451
452
    try:
453
        # phantomjs
454
        if not g.get('phantomjs_proxy'):
455
            phantomjs_config = g.config.get('phantomjs', {})
456
            phantomjs_config.setdefault('auto_restart', True)
457
            threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
458
            time.sleep(2)
459
            if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
460
                g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
461
462
        # result worker
463
        result_worker_config = g.config.get('result_worker', {})
464
        for i in range(result_worker_num):
465
            threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))
466
467
        # processor
468
        processor_config = g.config.get('processor', {})
469
        for i in range(processor_num):
470
            threads.append(run_in(ctx.invoke, processor, **processor_config))
471
472
        # fetcher
473
        fetcher_config = g.config.get('fetcher', {})
474
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
475
        for i in range(fetcher_num):
476
            threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
477
478
        # scheduler
479
        scheduler_config = g.config.get('scheduler', {})
480
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
481
        threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))
482
483
        # running webui in main thread to make it exitable
484
        webui_config = g.config.get('webui', {})
485
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
486
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
487
        ctx.invoke(webui, **webui_config)
488
    finally:
489
        # exit components run in threading
490
        for each in g.instances:
491
            each.quit()
492
493
        # exit components run in subprocess
494
        for each in threads:
495
            if not each.is_alive():
496
                continue
497
            if hasattr(each, 'terminate'):
498
                each.terminate()
499
            each.join()
500
501
502
@cli.command()
503
@click.option('--fetcher-num', default=1, help='instance num of fetcher')
504
@click.option('--processor-num', default=2, help='instance num of processor')
505
@click.option('--result-worker-num', default=1, help='instance num of result worker')
506
@click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
507
              help='run each components in thread or subprocess. '
508
              'always using thread for windows.')
509
@click.option('--total', default=10000, help="total url in test page")
510
@click.option('--show', default=20, help="show how many urls in a page")
511
@click.option('--taskdb-bench', default=False, is_flag=True,
512
              help="only run taskdb bench test")
513
@click.option('--message-queue-bench', default=False, is_flag=True,
514
              help="only run message queue bench test")
515
@click.option('--all-bench', default=False, is_flag=True,
516
              help="only run all bench test")
517
@click.pass_context
518
def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show,
519
          taskdb_bench, message_queue_bench, all_bench):
520
    """
521
    Run Benchmark test.
522
    In bench mode, in-memory sqlite database is used instead of on-disk sqlite database.
523
    """
524
    from pyspider.libs import bench
525
    from pyspider.webui import bench_test  # flake8: noqa
526
527
    ctx.obj['debug'] = False
528
    g = ctx.obj
529
    if result_worker_num == 0:
530
        g['processor2result'] = None
531
532
    if run_in == 'subprocess' and os.name != 'nt':
533
        run_in = utils.run_in_subprocess
534
    else:
535
        run_in = utils.run_in_thread
536
537
    all_test = not taskdb_bench and not message_queue_bench and not all_bench
538
539
    # test taskdb
540
    if all_test or taskdb_bench:
541
        bench.bench_test_taskdb(g.taskdb)
542
    # test message queue
543
    if all_test or message_queue_bench:
544
        bench.bench_test_message_queue(g.scheduler2fetcher)
545
    # test all
546
    if not all_test and not all_bench:
547
        return
548
549
    project_name = '__bench_test__'
550
551
    def clear_project():
552
        g.taskdb.drop(project_name)
553
        g.projectdb.drop(project_name)
554
        g.resultdb.drop(project_name)
555
556
    clear_project()
557
    g.projectdb.insert(project_name, {
558
        'name': project_name,
559
        'status': 'RUNNING',
560
        'script': bench.bench_script % {'total': total, 'show': show},
561
        'rate': total,
562
        'burst': total,
563
        'updatetime': time.time()
564
    })
565
566
    # disable log
567
    logging.getLogger().setLevel(logging.ERROR)
568
    logging.getLogger('scheduler').setLevel(logging.ERROR)
569
    logging.getLogger('fetcher').setLevel(logging.ERROR)
570
    logging.getLogger('processor').setLevel(logging.ERROR)
571
    logging.getLogger('result').setLevel(logging.ERROR)
572
    logging.getLogger('webui').setLevel(logging.ERROR)
573
    logging.getLogger('werkzeug').setLevel(logging.ERROR)
574
575
    try:
576
        threads = []
577
578
        # result worker
579
        result_worker_config = g.config.get('result_worker', {})
580
        for i in range(result_worker_num):
581
            threads.append(run_in(ctx.invoke, result_worker,
582
                                  result_cls='pyspider.libs.bench.BenchResultWorker',
583
                                  **result_worker_config))
584
585
        # processor
586
        processor_config = g.config.get('processor', {})
587
        for i in range(processor_num):
588
            threads.append(run_in(ctx.invoke, processor,
589
                                  processor_cls='pyspider.libs.bench.BenchProcessor',
590
                                  **processor_config))
591
592
        # fetcher
593
        fetcher_config = g.config.get('fetcher', {})
594
        fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
595
        for i in range(fetcher_num):
596
            threads.append(run_in(ctx.invoke, fetcher,
597
                                  fetcher_cls='pyspider.libs.bench.BenchFetcher',
598
                                  **fetcher_config))
599
600
        # webui
601
        webui_config = g.config.get('webui', {})
602
        webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
603
                                % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
604
        threads.append(run_in(ctx.invoke, webui, **webui_config))
605
606
        # scheduler
607
        scheduler_config = g.config.get('scheduler', {})
608
        scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
609
        scheduler_config.setdefault('xmlrpc_port', 23333)
610
        threads.append(run_in(ctx.invoke, scheduler,
611
                              scheduler_cls='pyspider.libs.bench.BenchScheduler',
612
                              **scheduler_config))
613
        scheduler_rpc = connect_rpc(ctx, None,
614
                                    'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config)
615
616
        time.sleep(2)
617
618
        scheduler_rpc.newtask({
619
            "project": project_name,
620
            "taskid": "on_start",
621
            "url": "data:,on_start",
622
            "process": {
623
                "callback": "on_start",
624
            },
625
        })
626
627
        # wait bench test finished
628
        while True:
629
            time.sleep(1)
630
            if scheduler_rpc.size() == 0:
631
                break
632
    finally:
633
        # exit components run in threading
634
        for each in g.instances:
635
            each.quit()
636
637
        # exit components run in subprocess
638
        for each in threads:
639
            if hasattr(each, 'terminate'):
640
                each.terminate()
641
            each.join(1)
642
643
        clear_project()
644
645
646
@cli.command()
647
@click.option('-i', '--interactive', default=False, is_flag=True,
648
              help='enable interactive mode, you can choose crawl url.')
649
@click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True,
650
              help='enable phantomjs, will spawn a subprocess for phantomjs')
651
@click.argument('scripts', nargs=-1)
652
@click.pass_context
653
def one(ctx, interactive, enable_phantomjs, scripts):
654
    """
655
    One mode not only means all-in-one, it runs every thing in one process over
656
    tornado.ioloop, for debug purpose
657
    """
658
659
    ctx.obj['debug'] = False
660
    g = ctx.obj
661
    g['testing_mode'] = True
662
663
    if scripts:
664
        from pyspider.database.local.projectdb import ProjectDB
665
        g['projectdb'] = ProjectDB(scripts)
666
        if g.get('is_taskdb_default'):
667
            g['taskdb'] = connect_database('sqlite+taskdb://')
668
        if g.get('is_resultdb_default'):
669
            g['resultdb'] = None
670
671
    if enable_phantomjs:
672
        phantomjs_config = g.config.get('phantomjs', {})
673
        phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config)
674
        if phantomjs_obj:
675
            g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port)
676
    else:
677
        phantomjs_obj = None
678
679
    result_worker_config = g.config.get('result_worker', {})
680
    if g.resultdb is None:
681
        result_worker_config.setdefault('result_cls',
682
                                        'pyspider.result.OneResultWorker')
683
    result_worker_obj = ctx.invoke(result_worker, **result_worker_config)
684
685
    processor_config = g.config.get('processor', {})
686
    processor_config.setdefault('enable_stdout_capture', False)
687
    processor_obj = ctx.invoke(processor, **processor_config)
688
689
    fetcher_config = g.config.get('fetcher', {})
690
    fetcher_config.setdefault('xmlrpc', False)
691
    fetcher_obj = ctx.invoke(fetcher, **fetcher_config)
692
693
    scheduler_config = g.config.get('scheduler', {})
694
    scheduler_config.setdefault('xmlrpc', False)
695
    scheduler_config.setdefault('scheduler_cls',
696
                                'pyspider.scheduler.OneScheduler')
697
    scheduler_obj = ctx.invoke(scheduler, **scheduler_config)
698
699
    scheduler_obj.init_one(ioloop=fetcher_obj.ioloop,
700
                           fetcher=fetcher_obj,
701
                           processor=processor_obj,
702
                           result_worker=result_worker_obj,
703
                           interactive=interactive)
704
    if scripts:
705
        for project in g.projectdb.projects:
706
            scheduler_obj.trigger_on_start(project)
707
708
    try:
709
        scheduler_obj.run()
710
    finally:
711
        scheduler_obj.quit()
712
        if phantomjs_obj:
713
            phantomjs_obj.quit()
714
715
716
@cli.command()
717
@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
718
@click.argument('project', nargs=1)
719
@click.argument('message', nargs=1)
720
@click.pass_context
721
def send_message(ctx, scheduler_rpc, project, message):
722
    """
723
    Send Message to project from command line
724
    """
725
    if isinstance(scheduler_rpc, six.string_types):
726
        scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
727
    if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
728
        scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
729
            os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
730
    if scheduler_rpc is None:
731
        scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
732
733
    return scheduler_rpc.send_task({
734
        'taskid': utils.md5string('data:,on_message'),
735
        'project': project,
736
        'url': 'data:,on_message',
737
        'fetch': {
738
            'save': ('__command__', message),
739
        },
740
        'process': {
741
            'callback': '_on_message',
742
        }
743
    })
744
745
746
def main():
747
    cli()
748
749
if __name__ == '__main__':
750
    main()
751