Completed
Push — master ( e6dbce...addc19 )
by Roy
02:40
created

Scheduler   F

Complexity

Total Complexity 179

Size/Duplication

Total Lines 800
Duplicated Lines 0 %

Importance

Changes 8
Bugs 2 Features 2
Metric Value
c 8
b 2
f 2
dl 0
loc 800
rs 1.263
wmc 179

36 Methods

Rating   Name   Duplication   Size   Complexity  
B run() 0 20 5
F _print_counter_log() 0 37 9
B _check_delete() 0 19 7
A quit() 0 7 2
A run_once() 0 11 2
A insert_task() 0 3 1
A dump_counter() 0 5 2
C _check_cronjob() 0 31 7
F _check_select() 0 61 15
A _load_tasks() 0 18 3
A _try_dump_cnt() 0 7 2
A new_task() 0 5 2
A update_project() 0 2 1
A on_select_task() 0 18 3
A put_task() 0 7 1
F xmlrpc_run() 0 86 26
F _check_request() 0 44 14
A _update_projects() 0 13 4
B on_task_done() 0 22 4
A _load_put_task() 0 9 3
A send_task() 0 13 1
A __len__() 0 2 2
A _dump_cnt() 0 5 1
A on_request() 0 11 4
B __init__() 0 35 6
A on_new_request() 0 13 1
D _check_task_done() 0 23 8
A _update_project_cnt() 0 13 1
F get_active_tasks() 0 41 19
B on_task_status() 0 24 6
A trigger_on_start() 0 8 1
F on_old_request() 0 49 14
D on_task_failed() 0 52 9
A update_task() 0 3 1
C task_verify() 0 22 7
C _update_project() 0 38 7

How to fix   Complexity   

Complex Class

Complex classes like Scheduler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import itertools
10
import json
11
import logging
12
import os
13
import time
14
from collections import deque
15
16
from six import iteritems, itervalues
17
from six.moves import queue as Queue
18
19
from pyspider.libs import counter, utils
20
from pyspider.libs.base_handler import BaseHandler
21
from .task_queue import TaskQueue
22
23
logger = logging.getLogger('scheduler')
24
25
26
class Project(object):
27
    '''
28
    project for scheduler
29
    '''
30
    def __init__(self, project_info, ACTIVE_TASKS=100):
31
        '''
32
        '''
33
        self.paused = False
34
35
        self.active_tasks = deque(maxlen=ACTIVE_TASKS)
36
        self.task_queue = TaskQueue()
37
        self.task_loaded = False
38
        self._send_finished_event = False
39
40
        self.md5sum = None
41
        self._send_on_get_info = False
42
        self.waiting_get_info = True
43
44
        self.update(project_info)
45
46
    def update(self, project_info):
47
        self.project_info = project_info
48
49
        self.name = project_info['name']
50
        self.group = project_info['group']
51
        self.db_status = project_info['status']
52
        self.updatetime = project_info['updatetime']
53
54
        md5sum = utils.md5string(project_info['script'])
55
        if (self.md5sum != md5sum or self.waiting_get_info) and self.active:
56
            self._send_on_get_info = True
57
            self.waiting_get_info = True
58
        self.md5sum = md5sum
59
60
        if self.active:
61
            self.task_queue.rate = project_info['rate']
62
            self.task_queue.burst = project_info['burst']
63
        else:
64
            self.task_queue.rate = 0
65
            self.task_queue.burst = 0
66
67
        logger.info('project %s updated, status:%s, paused:%s, %d tasks',
68
                    self.name, self.db_status, self.paused, len(self.task_queue))
69
70
    def on_get_info(self, info):
71
        self.waiting_get_info = False
72
        self.min_tick = info.get('min_tick', 0)
73
        self.retry_delay = info.get('retry_delay', {})
74
        self.crawl_config = info.get('crawl_config', {})
75
76
    @property
77
    def active(self):
78
        return self.db_status in ('RUNNING', 'DEBUG') and not self.paused
79
80
81
class Scheduler(object):
82
    UPDATE_PROJECT_INTERVAL = 5 * 60
83
    default_schedule = {
84
        'priority': 0,
85
        'retries': 3,
86
        'exetime': 0,
87
        'age': -1,
88
        'itag': None,
89
    }
90
    LOOP_LIMIT = 1000
91
    LOOP_INTERVAL = 0.1
92
    ACTIVE_TASKS = 100
93
    INQUEUE_LIMIT = 0
94
    EXCEPTION_LIMIT = 3
95
    DELETE_TIME = 24 * 60 * 60
96
    DEFAULT_RETRY_DELAY = {
97
        0: 30,
98
        1: 1*60*60,
99
        2: 6*60*60,
100
        3: 12*60*60,
101
        '': 24*60*60
102
    }
103
104
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
105
                 out_queue, data_path='./data', resultdb=None):
106
        self.taskdb = taskdb
107
        self.projectdb = projectdb
108
        self.resultdb = resultdb
109
        self.newtask_queue = newtask_queue
110
        self.status_queue = status_queue
111
        self.out_queue = out_queue
112
        self.data_path = data_path
113
114
        self._send_buffer = deque()
115
        self._quit = False
116
        self._exceptions = 0
117
        self.projects = dict()
118
        self._force_update_project = False
119
        self._last_update_project = 0
120
        self._last_tick = int(time.time())
121
        self._postpone_request = []
122
123
        self._cnt = {
124
            "5m_time": counter.CounterManager(
125
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
126
            "5m": counter.CounterManager(
127
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
128
            "1h": counter.CounterManager(
129
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
130
            "1d": counter.CounterManager(
131
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
132
            "all": counter.CounterManager(
133
                lambda: counter.TotalCounter()),
134
        }
135
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
136
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
137
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
138
        self._last_dump_cnt = 0
139
140
    def _update_projects(self):
141
        '''Check project update'''
142
        now = time.time()
143
        if (
144
                not self._force_update_project
145
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
146
        ):
147
            return
148
        for project in self.projectdb.check_update(self._last_update_project):
149
            self._update_project(project)
150
            logger.debug("project: %s updated.", project['name'])
151
        self._force_update_project = False
152
        self._last_update_project = now
153
154
    get_info_attributes = ['min_tick', 'retry_delay', 'crawl_config']
155
156
    def _update_project(self, project):
157
        '''update one project'''
158
        if project['name'] not in self.projects:
159
            self.projects[project['name']] = Project(project, ACTIVE_TASKS=self.ACTIVE_TASKS)
160
        else:
161
            self.projects[project['name']].update(project)
162
163
        project = self.projects[project['name']]
164
165
        if project._send_on_get_info:
166
            # update project runtime info from processor by sending a _on_get_info
167
            # request, result is in status_page.track.save
168
            project._send_on_get_info = False
169
            self.on_select_task({
170
                'taskid': '_on_get_info',
171
                'project': project.name,
172
                'url': 'data:,_on_get_info',
173
                'status': self.taskdb.SUCCESS,
174
                'fetch': {
175
                    'save': self.get_info_attributes,
176
                },
177
                'process': {
178
                    'callback': '_on_get_info',
179
                },
180
            })
181
182
        # load task queue when project is running and delete task_queue when project is stoped
183
        if project.active:
184
            if not project.task_loaded:
185
                self._load_tasks(project)
186
                project.task_loaded = True
187
        else:
188
            if project.task_loaded:
189
                project.task_queue = TaskQueue()
190
                project.task_loaded = False
191
192
            if project not in self._cnt['all']:
193
                self._update_project_cnt(project.name)
194
195
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
196
197
    def _load_tasks(self, project):
198
        '''load tasks from database'''
199
        task_queue = project.task_queue
200
201
        for task in self.taskdb.load_tasks(
202
                self.taskdb.ACTIVE, project.name, self.scheduler_task_fields
203
        ):
204
            taskid = task['taskid']
205
            _schedule = task.get('schedule', self.default_schedule)
206
            priority = _schedule.get('priority', self.default_schedule['priority'])
207
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
208
            task_queue.put(taskid, priority, exetime)
209
        project.task_loaded = True
210
        logger.debug('project: %s loaded %d tasks.', project.name, len(task_queue))
211
212
        if project not in self._cnt['all']:
213
            self._update_project_cnt(project)
214
        self._cnt['all'].value((project.name, 'pending'), len(project.task_queue))
215
216
    def _update_project_cnt(self, project_name):
217
        status_count = self.taskdb.status_count(project_name)
218
        self._cnt['all'].value(
219
            (project_name, 'success'),
220
            status_count.get(self.taskdb.SUCCESS, 0)
221
        )
222
        self._cnt['all'].value(
223
            (project_name, 'failed'),
224
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
225
        )
226
        self._cnt['all'].value(
227
            (project_name, 'pending'),
228
            status_count.get(self.taskdb.ACTIVE, 0)
229
        )
230
231
    def task_verify(self, task):
232
        '''
233
        return False if any of 'taskid', 'project', 'url' is not in task dict
234
                        or project in not in task_queue
235
        '''
236
        for each in ('taskid', 'project', 'url', ):
237
            if each not in task or not task[each]:
238
                logger.error('%s not in task: %.200r', each, task)
239
                return False
240
        if task['project'] not in self.projects:
241
            logger.error('unknown project: %s', task['project'])
242
            return False
243
244
        project = self.projects[task['project']]
245
        if not project.active:
246
            if project.paused:
247
                logger.error('project %s paused', task['project'])
248
            else:
249
                logger.error('project %s not started, please set status to RUNNING or DEBUG',
250
                             task['project'])
251
            return False
252
        return True
253
254
    def insert_task(self, task):
255
        '''insert task into database'''
256
        return self.taskdb.insert(task['project'], task['taskid'], task)
257
258
    def update_task(self, task):
259
        '''update task in database'''
260
        return self.taskdb.update(task['project'], task['taskid'], task)
261
262
    def put_task(self, task):
263
        '''put task to task queue'''
264
        _schedule = task.get('schedule', self.default_schedule)
265
        self.projects[task['project']].task_queue.put(
266
            task['taskid'],
267
            priority=_schedule.get('priority', self.default_schedule['priority']),
268
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
269
        )
270
271
    def send_task(self, task, force=True):
272
        '''
273
        dispatch task to fetcher
274
275
        out queue may have size limit to prevent block, a send_buffer is used
276
        '''
277
        try:
278
            self.out_queue.put_nowait(task)
279
        except Queue.Full:
280
            if force:
281
                self._send_buffer.appendleft(task)
282
            else:
283
                raise
284
285
    def _check_task_done(self):
286
        '''Check status queue'''
287
        cnt = 0
288
        try:
289
            while True:
290
                task = self.status_queue.get_nowait()
291
                # check _on_get_info result here
292
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
293
                    if task['project'] not in self.projects:
294
                        continue
295
                    project = self.projects[task['project']]
296
                    project.on_get_info(task['track'].get('save') or {})
297
                    logger.info(
298
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
299
                    )
300
                    continue
301
                elif not self.task_verify(task):
302
                    continue
303
                self.on_task_status(task)
304
                cnt += 1
305
        except Queue.Empty:
306
            pass
307
        return cnt
308
309
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
310
311
    def _check_request(self):
312
        '''Check new task queue'''
313
        # check _postpone_request first
314
        todo = []
315
        for task in self._postpone_request:
316
            if task['project'] not in self.projects:
317
                continue
318
            if self.projects[task['project']].task_queue.is_processing(task['taskid']):
319
                todo.append(task)
320
            else:
321
                self.on_request(task)
322
        self._postpone_request = todo
323
324
        tasks = {}
325
        while len(tasks) < self.LOOP_LIMIT:
326
            try:
327
                task = self.newtask_queue.get_nowait()
328
            except Queue.Empty:
329
                break
330
331
            if isinstance(task, list):
332
                _tasks = task
333
            else:
334
                _tasks = (task, )
335
336
            for task in _tasks:
337
                if not self.task_verify(task):
338
                    continue
339
340
                if task['taskid'] in self.projects[task['project']].task_queue:
341
                    if not task.get('schedule', {}).get('force_update', False):
342
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
343
                        continue
344
345
                if task['taskid'] in tasks:
346
                    if not task.get('schedule', {}).get('force_update', False):
347
                        continue
348
349
                tasks[task['taskid']] = task
350
351
        for task in itervalues(tasks):
352
            self.on_request(task)
353
354
        return len(tasks)
355
356
    def _check_cronjob(self):
357
        """Check projects cronjob tick, return True when a new tick is sended"""
358
        now = time.time()
359
        self._last_tick = int(self._last_tick)
360
        if now - self._last_tick < 1:
361
            return False
362
        self._last_tick += 1
363
        for project in itervalues(self.projects):
364
            if not project.active:
365
                continue
366
            if project.waiting_get_info:
367
                continue
368
            if project.min_tick == 0:
369
                continue
370
            if self._last_tick % int(project.min_tick) != 0:
371
                continue
372
            self.on_select_task({
373
                'taskid': '_on_cronjob',
374
                'project': project.name,
375
                'url': 'data:,_on_cronjob',
376
                'status': self.taskdb.SUCCESS,
377
                'fetch': {
378
                    'save': {
379
                        'tick': self._last_tick,
380
                    },
381
                },
382
                'process': {
383
                    'callback': '_on_cronjob',
384
                },
385
            })
386
        return True
387
388
    request_task_fields = [
389
        'taskid',
390
        'project',
391
        'url',
392
        'status',
393
        'schedule',
394
        'fetch',
395
        'process',
396
        'track',
397
        'lastcrawltime'
398
    ]
399
400
    def _check_select(self):
401
        '''Select task to fetch & process'''
402
        while self._send_buffer:
403
            _task = self._send_buffer.pop()
404
            try:
405
                # use force=False here to prevent automatic send_buffer append and get exception
406
                self.send_task(_task, False)
407
            except Queue.Full:
408
                self._send_buffer.append(_task)
409
                break
410
411
        if self.out_queue.full():
412
            return {}
413
414
        taskids = []
415
        cnt = 0
416
        cnt_dict = dict()
417
        limit = self.LOOP_LIMIT
418
        for project in itervalues(self.projects):
419
            if not project.active:
420
                continue
421
            if project.waiting_get_info:
422
                continue
423
            if cnt >= limit:
424
                break
425
426
            # task queue
427
            task_queue = project.task_queue
428
            task_queue.check_update()
429
            project_cnt = 0
430
431
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
432
            while cnt < limit and project_cnt < limit / 10:
433
                taskid = task_queue.get()
434
                if not taskid:
435
                    break
436
437
                taskids.append((project.name, taskid))
438
                project_cnt += 1
439
                cnt += 1
440
441
            cnt_dict[project.name] = project_cnt
442
            if project_cnt:
443
                project._send_finished_event = True
444
            # check and send finished event to project
445
            elif len(task_queue) == 0 and project._send_finished_event:
446
                project._send_finished_event = False
447
                self.on_select_task({
448
                    'taskid': 'on_finished',
449
                    'project': project.name,
450
                    'url': 'data:,on_finished',
451
                    'status': self.taskdb.SUCCESS,
452
                    'process': {
453
                        'callback': 'on_finished',
454
                    },
455
                })
456
457
        for project, taskid in taskids:
458
            self._load_put_task(project, taskid)
459
460
        return cnt_dict
461
462
    def _load_put_task(self, project, taskid):
463
        try:
464
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
465
        except ValueError:
466
            logger.error('bad task pack %s:%s', project, taskid)
467
            return
468
        if not task:
469
            return
470
        task = self.on_select_task(task)
471
472
    def _print_counter_log(self):
473
        # print top 5 active counters
474
        keywords = ('pending', 'success', 'retry', 'failed')
475
        total_cnt = {}
476
        project_actives = []
477
        project_fails = []
478
        for key in keywords:
479
            total_cnt[key] = 0
480
        for project, subcounter in iteritems(self._cnt['5m']):
481
            actives = 0
482
            for key in keywords:
483
                cnt = subcounter.get(key, None)
484
                if cnt:
485
                    cnt = cnt.sum
486
                    total_cnt[key] += cnt
487
                    actives += cnt
488
489
            project_actives.append((actives, project))
490
491
            fails = subcounter.get('failed', None)
492
            if fails:
493
                project_fails.append((fails.sum, project))
494
495
        top_2_fails = sorted(project_fails, reverse=True)[:2]
496
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
497
                               reverse=True)[:5 - len(top_2_fails)]
498
499
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
500
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
501
        for _, project in itertools.chain(top_3_actives, top_2_fails):
502
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
503
            log_str += " %s:%d,%d,%d,%d" % (project,
504
                                            subcounter.get('pending', 0),
505
                                            subcounter.get('success', 0),
506
                                            subcounter.get('retry', 0),
507
                                            subcounter.get('failed', 0))
508
        logger.info(log_str)
509
510
    def _dump_cnt(self):
511
        '''Dump counters to file'''
512
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
513
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
514
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
515
516
    def _try_dump_cnt(self):
517
        '''Dump counters every 60 seconds'''
518
        now = time.time()
519
        if now - self._last_dump_cnt > 60:
520
            self._last_dump_cnt = now
521
            self._dump_cnt()
522
            self._print_counter_log()
523
524
    def _check_delete(self):
525
        '''Check project delete'''
526
        now = time.time()
527
        for project in list(itervalues(self.projects)):
528
            if project.db_status != 'STOP':
529
                continue
530
            if now - project.updatetime < self.DELETE_TIME:
531
                continue
532
            if 'delete' not in self.projectdb.split_group(project.group):
533
                continue
534
535
            logger.warning("deleting project: %s!", project.name)
536
            del self.projects[project.name]
537
            self.taskdb.drop(project.name)
538
            self.projectdb.drop(project.name)
539
            if self.resultdb:
540
                self.resultdb.drop(project.name)
541
            for each in self._cnt.values():
542
                del each[project.name]
543
544
    def __len__(self):
545
        return sum(len(x.task_queue) for x in itervalues(self.projects))
546
547
    def quit(self):
548
        '''Set quit signal'''
549
        self._quit = True
550
        # stop xmlrpc server
551
        if hasattr(self, 'xmlrpc_server'):
552
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
553
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
554
555
    def run_once(self):
556
        '''comsume queues and feed tasks to fetcher, once'''
557
558
        self._update_projects()
559
        self._check_task_done()
560
        self._check_request()
561
        while self._check_cronjob():
562
            pass
563
        self._check_select()
564
        self._check_delete()
565
        self._try_dump_cnt()
566
567
    def run(self):
568
        '''Start scheduler loop'''
569
        logger.info("scheduler starting...")
570
571
        while not self._quit:
572
            try:
573
                time.sleep(self.LOOP_INTERVAL)
574
                self.run_once()
575
                self._exceptions = 0
576
            except KeyboardInterrupt:
577
                break
578
            except Exception as e:
579
                logger.exception(e)
580
                self._exceptions += 1
581
                if self._exceptions > self.EXCEPTION_LIMIT:
582
                    break
583
                continue
584
585
        logger.info("scheduler exiting...")
586
        self._dump_cnt()
587
588
    def trigger_on_start(self, project):
589
        '''trigger an on_start callback of project'''
590
        self.newtask_queue.put({
591
            "project": project,
592
            "taskid": "on_start",
593
            "url": "data:,on_start",
594
            "process": {
595
                "callback": "on_start",
596
            },
597
        })
598
599
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
600
        '''Start xmlrpc interface'''
601
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
602
603
        application = WSGIXMLRPCApplication()
604
605
        application.register_function(self.quit, '_quit')
606
        application.register_function(self.__len__, 'size')
607
608
        def dump_counter(_time, _type):
609
            try:
610
                return self._cnt[_time].to_dict(_type)
611
            except:
612
                logger.exception('')
613
        application.register_function(dump_counter, 'counter')
614
615
        def new_task(task):
616
            if self.task_verify(task):
617
                self.newtask_queue.put(task)
618
                return True
619
            return False
620
        application.register_function(new_task, 'newtask')
621
622
        def send_task(task):
623
            '''dispatch task to fetcher'''
624
            self.send_task(task)
625
            return True
626
        application.register_function(send_task, 'send_task')
627
628
        def update_project():
629
            self._force_update_project = True
630
        application.register_function(update_project, 'update_project')
631
632
        def get_active_tasks(project=None, limit=100):
633
            allowed_keys = set((
634
                'taskid',
635
                'project',
636
                'status',
637
                'url',
638
                'lastcrawltime',
639
                'updatetime',
640
                'track',
641
            ))
642
            track_allowed_keys = set((
643
                'ok',
644
                'time',
645
                'follows',
646
                'status_code',
647
            ))
648
649
            iters = [iter(x.active_tasks) for k, x in iteritems(self.projects)
650
                     if x and (k == project if project else True)]
651
            tasks = [next(x, None) for x in iters]
652
            result = []
653
654
            while len(result) < limit and tasks and not all(x is None for x in tasks):
655
                updatetime, task = t = max(t for t in tasks if t)
656
                i = tasks.index(t)
657
                tasks[i] = next(iters[i], None)
658
                for key in list(task):
659
                    if key == 'track':
660
                        for k in list(task[key].get('fetch', [])):
661
                            if k not in track_allowed_keys:
662
                                del task[key]['fetch'][k]
663
                        for k in list(task[key].get('process', [])):
664
                            if k not in track_allowed_keys:
665
                                del task[key]['process'][k]
666
                    if key in allowed_keys:
667
                        continue
668
                    del task[key]
669
                result.append(t)
670
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
671
            # have no idea why
672
            return json.loads(json.dumps(result))
673
        application.register_function(get_active_tasks, 'get_active_tasks')
674
675
        import tornado.wsgi
676
        import tornado.ioloop
677
        import tornado.httpserver
678
679
        container = tornado.wsgi.WSGIContainer(application)
680
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
681
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
682
        self.xmlrpc_server.listen(port=port, address=bind)
683
        logger.info('scheduler.xmlrpc listening on %s:%s', bind, port)
684
        self.xmlrpc_ioloop.start()
685
686
    def on_request(self, task):
687
        if self.INQUEUE_LIMIT and len(self.projects[task['project']].task_queue) >= self.INQUEUE_LIMIT:
688
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
689
            return
690
691
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
692
                                       fields=self.merge_task_fields)
693
        if oldtask:
694
            return self.on_old_request(task, oldtask)
695
        else:
696
            return self.on_new_request(task)
697
698
    def on_new_request(self, task):
699
        '''Called when a new request is arrived'''
700
        task['status'] = self.taskdb.ACTIVE
701
        self.insert_task(task)
702
        self.put_task(task)
703
704
        project = task['project']
705
        self._cnt['5m'].event((project, 'pending'), +1)
706
        self._cnt['1h'].event((project, 'pending'), +1)
707
        self._cnt['1d'].event((project, 'pending'), +1)
708
        self._cnt['all'].event((project, 'pending'), +1)
709
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
710
        return task
711
712
    def on_old_request(self, task, old_task):
713
        '''Called when a crawled task is arrived'''
714
        now = time.time()
715
716
        _schedule = task.get('schedule', self.default_schedule)
717
        old_schedule = old_task.get('schedule', {})
718
719
        if _schedule.get('force_update') and self.projects[task['project']].task_queue.is_processing(task['taskid']):
720
            # when a task is in processing, the modify may conflict with the running task.
721
            # postpone the modify after task finished.
722
            logger.info('postpone modify task %(project)s:%(taskid)s %(url)s', task)
723
            self._postpone_request.append(task)
724
            return
725
726
        restart = False
727
        schedule_age = _schedule.get('age', self.default_schedule['age'])
728
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
729
            restart = True
730
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
731
            restart = True
732
        elif _schedule.get('force_update'):
733
            restart = True
734
735
        if not restart:
736
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
737
            return
738
739
        if _schedule.get('cancel'):
740
            logger.info('cancel task %(project)s:%(taskid)s %(url)s', task)
741
            task['status'] = self.taskdb.BAD
742
            self.update_task(task)
743
            self.projects[task['project']].task_queue.delete(task['taskid'])
744
            return task
745
746
        task['status'] = self.taskdb.ACTIVE
747
        self.update_task(task)
748
        self.put_task(task)
749
750
        project = task['project']
751
        if old_task['status'] != self.taskdb.ACTIVE:
752
            self._cnt['5m'].event((project, 'pending'), +1)
753
            self._cnt['1h'].event((project, 'pending'), +1)
754
            self._cnt['1d'].event((project, 'pending'), +1)
755
        if old_task['status'] == self.taskdb.SUCCESS:
756
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
757
        elif old_task['status'] == self.taskdb.FAILED:
758
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
759
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
760
        return task
761
762
    def on_task_status(self, task):
763
        '''Called when a status pack is arrived'''
764
        try:
765
            procesok = task['track']['process']['ok']
766
            if not self.projects[task['project']].task_queue.done(task['taskid']):
767
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
768
                return None
769
        except KeyError as e:
770
            logger.error("Bad status pack: %s", e)
771
            return None
772
773
        if procesok:
774
            ret = self.on_task_done(task)
775
        else:
776
            ret = self.on_task_failed(task)
777
778
        if task['track']['fetch'].get('time'):
779
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
780
                                       task['track']['fetch']['time'])
781
        if task['track']['process'].get('time'):
782
            self._cnt['5m_time'].event((task['project'], 'process_time'),
783
                                       task['track']['process'].get('time'))
784
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
785
        return ret
786
787
    def on_task_done(self, task):
788
        '''Called when a task is done and success, called by `on_task_status`'''
789
        task['status'] = self.taskdb.SUCCESS
790
        task['lastcrawltime'] = time.time()
791
792
        if 'schedule' in task:
793
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
794
                task['status'] = self.taskdb.ACTIVE
795
                next_exetime = task['schedule'].get('age')
796
                task['schedule']['exetime'] = time.time() + next_exetime
797
                self.put_task(task)
798
            else:
799
                del task['schedule']
800
        self.update_task(task)
801
802
        project = task['project']
803
        self._cnt['5m'].event((project, 'success'), +1)
804
        self._cnt['1h'].event((project, 'success'), +1)
805
        self._cnt['1d'].event((project, 'success'), +1)
806
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
807
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
808
        return task
809
810
    def on_task_failed(self, task):
811
        '''Called when a task is failed, called by `on_task_status`'''
812
813
        if 'schedule' not in task:
814
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
815
            if old_task is None:
816
                logging.error('unknown status pack: %s' % task)
817
                return
818
            task['schedule'] = old_task.get('schedule', {})
819
820
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
821
        retried = task['schedule'].get('retried', 0)
822
823
        project_info = self.projects[task['project']]
824
        retry_delay = project_info.retry_delay or self.DEFAULT_RETRY_DELAY
825
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
826
827
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
828
            next_exetime = min(next_exetime, task['schedule'].get('age'))
829
        else:
830
            if retried >= retries:
831
                next_exetime = -1
832
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
833
                next_exetime = task['schedule'].get('age')
834
835
        if next_exetime < 0:
836
            task['status'] = self.taskdb.FAILED
837
            task['lastcrawltime'] = time.time()
838
            self.update_task(task)
839
840
            project = task['project']
841
            self._cnt['5m'].event((project, 'failed'), +1)
842
            self._cnt['1h'].event((project, 'failed'), +1)
843
            self._cnt['1d'].event((project, 'failed'), +1)
844
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
845
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
846
            return task
847
        else:
848
            task['schedule']['retried'] = retried + 1
849
            task['schedule']['exetime'] = time.time() + next_exetime
850
            task['lastcrawltime'] = time.time()
851
            self.update_task(task)
852
            self.put_task(task)
853
854
            project = task['project']
855
            self._cnt['5m'].event((project, 'retry'), +1)
856
            self._cnt['1h'].event((project, 'retry'), +1)
857
            self._cnt['1d'].event((project, 'retry'), +1)
858
            # self._cnt['all'].event((project, 'retry'), +1)
859
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
860
                retried, retries), task)
861
            return task
862
863
    def on_select_task(self, task):
864
        '''Called when a task is selected to fetch & process'''
865
        # inject informations about project
866
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
867
868
        project_info = self.projects.get(task['project'])
869
        assert project_info, 'no such project'
870
        task['group'] = project_info.group
871
        task['project_md5sum'] = project_info.md5sum
872
        task['project_updatetime'] = project_info.updatetime
873
874
        # lazy join project.crawl_config
875
        if getattr(project_info, 'crawl_config', None):
876
            task = BaseHandler.task_join_crawl_config(task, project_info.crawl_config)
877
878
        project_info.active_tasks.appendleft((time.time(), task))
879
        self.send_task(task)
880
        return task
881
882
883
from tornado import gen
884
885
886
class OneScheduler(Scheduler):
887
    """
888
    Scheduler Mixin class for one mode
889
890
    overwirted send_task method
891
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
892
    """
893
894
    def _check_select(self):
895
        """
896
        interactive mode of select tasks
897
        """
898
        if not self.interactive:
899
            return super(OneScheduler, self)._check_select()
900
901
        # waiting for running tasks
902
        if self.running_task > 0:
903
            return
904
905
        is_crawled = []
906
907
        def run(project=None):
908
            return crawl('on_start', project=project)
909
910
        def crawl(url, project=None, **kwargs):
911
            """
912
            Crawl given url, same parameters as BaseHandler.crawl
913
914
            url - url or taskid, parameters will be used if in taskdb
915
            project - can be ignored if only one project exists.
916
            """
917
918
            # looking up the project instance
919
            if project is None:
920
                if len(self.projects) == 1:
921
                    project = list(self.projects.keys())[0]
922
                else:
923
                    raise LookupError('You need specify the project: %r'
924
                                      % list(self.projects.keys()))
925
            project_data = self.processor.project_manager.get(project)
926
            if not project_data:
927
                raise LookupError('no such project: %s' % project)
928
929
            # get task package
930
            instance = project_data['instance']
931
            instance._reset()
932
            task = instance.crawl(url, **kwargs)
933
            if isinstance(task, list):
934
                raise Exception('url list is not allowed in interactive mode')
935
936
            # check task in taskdb
937
            if not kwargs:
938
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
939
                                              fields=self.request_task_fields)
940
                if not dbtask:
941
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
942
                                                  fields=self.request_task_fields)
943
                if dbtask:
944
                    task = dbtask
945
946
            # select the task
947
            self.on_select_task(task)
948
            is_crawled.append(True)
949
950
            shell.ask_exit()
951
952
        def quit_interactive():
953
            '''Quit interactive mode'''
954
            is_crawled.append(True)
955
            self.interactive = False
956
            shell.ask_exit()
957
958
        def quit_pyspider():
959
            '''Close pyspider'''
960
            is_crawled[:] = []
961
            shell.ask_exit()
962
963
        shell = utils.get_python_console()
964
        shell.interact(
965
            'pyspider shell - Select task\n'
966
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
967
            'quit_interactive() - Quit interactive mode\n'
968
            'quit_pyspider() - Close pyspider'
969
        )
970
        if not is_crawled:
971
            self.ioloop.add_callback(self.ioloop.stop)
972
973
    def __getattr__(self, name):
974
        """patch for crawl(url, callback=self.index_page) API"""
975
        if self.interactive:
976
            return name
977
        raise AttributeError(name)
978
979
    def on_task_status(self, task):
980
        """Ignore not processing error in interactive mode"""
981
        if not self.interactive:
982
            super(OneScheduler, self).on_task_status(task)
983
984
        try:
985
            procesok = task['track']['process']['ok']
986
        except KeyError as e:
987
            logger.error("Bad status pack: %s", e)
988
            return None
989
990
        if procesok:
991
            ret = self.on_task_done(task)
992
        else:
993
            ret = self.on_task_failed(task)
994
        if task['track']['fetch'].get('time'):
995
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
996
                                       task['track']['fetch']['time'])
997
        if task['track']['process'].get('time'):
998
            self._cnt['5m_time'].event((task['project'], 'process_time'),
999
                                       task['track']['process'].get('time'))
1000
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
1001
        return ret
1002
1003
    def init_one(self, ioloop, fetcher, processor,
1004
                 result_worker=None, interactive=False):
1005
        self.ioloop = ioloop
1006
        self.fetcher = fetcher
1007
        self.processor = processor
1008
        self.result_worker = result_worker
1009
        self.interactive = interactive
1010
        self.running_task = 0
1011
1012
    @gen.coroutine
1013
    def do_task(self, task):
1014
        self.running_task += 1
1015
        result = yield gen.Task(self.fetcher.fetch, task)
1016
        type, task, response = result.args
1017
        self.processor.on_task(task, response)
1018
        # do with message
1019
        while not self.processor.inqueue.empty():
1020
            _task, _response = self.processor.inqueue.get()
1021
            self.processor.on_task(_task, _response)
1022
        # do with results
1023
        while not self.processor.result_queue.empty():
1024
            _task, _result = self.processor.result_queue.get()
1025
            if self.result_worker:
1026
                self.result_worker.on_result(_task, _result)
1027
        self.running_task -= 1
1028
1029
    def send_task(self, task, force=True):
1030
        if self.fetcher.http_client.free_size() <= 0:
1031
            if force:
1032
                self._send_buffer.appendleft(task)
1033
            else:
1034
                raise self.outqueue.Full
1035
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
1036
1037
    def run(self):
1038
        import tornado.ioloop
1039
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
1040
                                        io_loop=self.ioloop).start()
1041
        self.ioloop.start()
1042
1043
    def quit(self):
1044
        self.ioloop.stop()
1045
        logger.info("scheduler exiting...")
1046
1047
1048
import random
1049
import threading
1050
1051
1052
class ThreadBaseScheduler(Scheduler):
1053
    def __init__(self, threads=4, *args, **kwargs):
1054
        self.threads = threads
1055
        self.local = threading.local()
1056
1057
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
1058
1059
        self._taskdb = self.taskdb
1060
        self._projectdb = self.projectdb
1061
        self._resultdb = self.resultdb
1062
1063
        self.thread_objs = []
1064
        self.thread_queues = []
1065
        self._start_threads()
1066
        assert len(self.thread_queues) > 0
1067
1068
    @property
1069
    def taskdb(self):
1070
        if not hasattr(self.local, 'taskdb'):
1071
            self.taskdb = self._taskdb.copy()
1072
        return self.local.taskdb
1073
1074
    @taskdb.setter
1075
    def taskdb(self, taskdb):
1076
        self.local.taskdb = taskdb
1077
1078
    @property
1079
    def projectdb(self):
1080
        if not hasattr(self.local, 'projectdb'):
1081
            self.projectdb = self._projectdb.copy()
1082
        return self.local.projectdb
1083
1084
    @projectdb.setter
1085
    def projectdb(self, projectdb):
1086
        self.local.projectdb = projectdb
1087
1088
    @property
1089
    def resultdb(self):
1090
        if not hasattr(self.local, 'resultdb'):
1091
            self.resultdb = self._resultdb.copy()
1092
        return self.local.resultdb
1093
1094
    @resultdb.setter
1095
    def resultdb(self, resultdb):
1096
        self.local.resultdb = resultdb
1097
1098
    def _start_threads(self):
1099
        for i in range(self.threads):
1100
            queue = Queue.Queue()
1101
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
1102
            thread.daemon = True
1103
            thread.start()
1104
            self.thread_objs.append(thread)
1105
            self.thread_queues.append(queue)
1106
1107
    def _thread_worker(self, queue):
1108
        while True:
1109
            method, args, kwargs = queue.get()
1110
            try:
1111
                method(*args, **kwargs)
1112
            except Exception as e:
1113
                logger.exception(e)
1114
1115
    def _run_in_thread(self, method, *args, **kwargs):
1116
        i = kwargs.pop('_i', None)
1117
        block = kwargs.pop('_block', False)
1118
1119
        if i is None:
1120
            while True:
1121
                for queue in self.thread_queues:
1122
                    if queue.empty():
1123
                        break
1124
                else:
1125
                    if block:
1126
                        time.sleep(0.1)
1127
                        continue
1128
                    else:
1129
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1130
                break
1131
        else:
1132
            queue = self.thread_queues[i % len(self.thread_queues)]
1133
1134
        queue.put((method, args, kwargs))
1135
1136
        if block:
1137
            self._wait_thread()
1138
1139
    def _wait_thread(self):
1140
        while True:
1141
            if all(queue.empty() for queue in self.thread_queues):
1142
                break
1143
            time.sleep(0.1)
1144
1145
    def _update_project(self, project):
1146
        self._run_in_thread(Scheduler._update_project, self, project)
1147
1148
    def on_task_status(self, task):
1149
        i = hash(task['taskid'])
1150
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1151
1152
    def on_request(self, task):
1153
        i = hash(task['taskid'])
1154
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1155
1156
    def _load_put_task(self, project, taskid):
1157
        i = hash(taskid)
1158
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1159
1160
    def run_once(self):
1161
        super(ThreadBaseScheduler, self).run_once()
1162
        self._wait_thread()
1163