Scheduler.xmlrpc_run()   F
last analyzed

Complexity

Conditions 29

Size

Total Lines 107

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 29
dl 0
loc 107
rs 2
c 1
b 0
f 1

6 Methods

Rating   Name   Duplication   Size   Complexity  
A Scheduler.webui_update() 0 9 1
A Scheduler.dump_counter() 0 5 2
A Scheduler.new_task() 0 5 2
A Scheduler.update_project() 0 2 1
A Scheduler.get_projects_pause_status() 0 5 2
F Scheduler.get_active_tasks() 0 42 19

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Scheduler.xmlrpc_run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import itertools
10
import json
11
import logging
12
import os
13
import time
14
from collections import deque
15
16
from six import iteritems, itervalues
17
from six.moves import queue as Queue
18
19
from pyspider.libs import counter, utils
20
from pyspider.libs.base_handler import BaseHandler
21
from .task_queue import TaskQueue
22
23
logger = logging.getLogger('scheduler')
24
25
26
class Project(object):
27
    '''
28
    project for scheduler
29
    '''
30
    def __init__(self, scheduler, project_info):
31
        '''
32
        '''
33
        self.scheduler = scheduler
34
35
        self.active_tasks = deque(maxlen=scheduler.ACTIVE_TASKS)
36
        self.task_queue = TaskQueue()
37
        self.task_loaded = False
38
        self._selected_tasks = False  # selected tasks after recent pause
39
        self._send_finished_event_wait = 0  # wait for scheduler.FAIL_PAUSE_NUM loop steps before sending the event
40
41
        self.md5sum = None
42
        self._send_on_get_info = False
43
        self.waiting_get_info = True
44
45
        self._paused = False
46
        self._paused_time = 0
47
        self._unpause_last_seen = None
48
49
        self.update(project_info)
50
51
    @property
52
    def paused(self):
53
        if self.scheduler.FAIL_PAUSE_NUM <= 0:
54
            return False
55
56
        # unpaused --(last FAIL_PAUSE_NUM task failed)--> paused --(PAUSE_TIME)--> unpause_checking
57
        #                         unpaused <--(last UNPAUSE_CHECK_NUM task have success)--|
58
        #                             paused <--(last UNPAUSE_CHECK_NUM task no success)--|
59
        if not self._paused:
60
            fail_cnt = 0
61
            for _, task in self.active_tasks:
62
                # ignore select task
63
                if task.get('type') == self.scheduler.TASK_PACK:
64
                    continue
65
                if 'process' not in task['track']:
66
                    logger.error('process not in task, %r', task)
67
                if task['track']['process']['ok']:
68
                    break
69
                else:
70
                    fail_cnt += 1
71
                if fail_cnt >= self.scheduler.FAIL_PAUSE_NUM:
72
                    break
73
            if fail_cnt >= self.scheduler.FAIL_PAUSE_NUM:
74
                self._paused = True
75
                self._paused_time = time.time()
76
        elif self._paused is True and (self._paused_time + self.scheduler.PAUSE_TIME < time.time()):
77
            self._paused = 'checking'
78
            self._unpause_last_seen = self.active_tasks[0][1] if len(self.active_tasks) else None
79
        elif self._paused == 'checking':
80
            cnt = 0
81
            fail_cnt = 0
82
            for _, task in self.active_tasks:
83
                if task is self._unpause_last_seen:
84
                    break
85
                # ignore select task
86
                if task.get('type') == self.scheduler.TASK_PACK:
87
                    continue
88
                cnt += 1
89
                if task['track']['process']['ok']:
90
                    # break with enough check cnt
91
                    cnt = max(cnt, self.scheduler.UNPAUSE_CHECK_NUM)
92
                    break
93
                else:
94
                    fail_cnt += 1
95
            if cnt >= self.scheduler.UNPAUSE_CHECK_NUM:
96
                if fail_cnt == cnt:
97
                    self._paused = True
98
                    self._paused_time = time.time()
99
                else:
100
                    self._paused = False
101
102
        return self._paused is True
103
104
    def update(self, project_info):
105
        self.project_info = project_info
106
107
        self.name = project_info['name']
108
        self.group = project_info['group']
109
        self.db_status = project_info['status']
110
        self.updatetime = project_info['updatetime']
111
112
        md5sum = utils.md5string(project_info['script'])
113
        if self.md5sum != md5sum:
114
            self.waiting_get_info = True
115
            self.md5sum = md5sum
116
        if self.waiting_get_info and self.active:
117
            self._send_on_get_info = True
118
119
        if self.active:
120
            self.task_queue.rate = project_info['rate']
121
            self.task_queue.burst = project_info['burst']
122
        else:
123
            self.task_queue.rate = 0
124
            self.task_queue.burst = 0
125
126
        logger.info('project %s updated, status:%s, paused:%s, %d tasks',
127
                    self.name, self.db_status, self.paused, len(self.task_queue))
128
129
    def on_get_info(self, info):
130
        self.waiting_get_info = False
131
        self.min_tick = info.get('min_tick', 0)
132
        self.retry_delay = info.get('retry_delay', {})
133
        self.crawl_config = info.get('crawl_config', {})
134
135
    @property
136
    def active(self):
137
        return self.db_status in ('RUNNING', 'DEBUG')
138
139
140
class Scheduler(object):
141
    UPDATE_PROJECT_INTERVAL = 5 * 60
142
    default_schedule = {
143
        'priority': 0,
144
        'retries': 3,
145
        'exetime': 0,
146
        'age': -1,
147
        'itag': None,
148
    }
149
    LOOP_LIMIT = 1000
150
    LOOP_INTERVAL = 0.1
151
    ACTIVE_TASKS = 100
152
    INQUEUE_LIMIT = 0
153
    EXCEPTION_LIMIT = 3
154
    DELETE_TIME = 24 * 60 * 60
155
    DEFAULT_RETRY_DELAY = {
156
        0: 30,
157
        1: 1*60*60,
158
        2: 6*60*60,
159
        3: 12*60*60,
160
        '': 24*60*60
161
    }
162
    FAIL_PAUSE_NUM = 10
163
    PAUSE_TIME = 5*60
164
    UNPAUSE_CHECK_NUM = 3
165
166
    TASK_PACK = 1
167
    STATUS_PACK = 2  # current not used
168
    REQUEST_PACK = 3  # current not used
169
170
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
171
                 out_queue, data_path='./data', resultdb=None):
172
        self.taskdb = taskdb
173
        self.projectdb = projectdb
174
        self.resultdb = resultdb
175
        self.newtask_queue = newtask_queue
176
        self.status_queue = status_queue
177
        self.out_queue = out_queue
178
        self.data_path = data_path
179
180
        self._send_buffer = deque()
181
        self._quit = False
182
        self._exceptions = 0
183
        self.projects = dict()
184
        self._force_update_project = False
185
        self._last_update_project = 0
186
        self._last_tick = int(time.time())
187
        self._postpone_request = []
188
189
        self._cnt = {
190
            "5m_time": counter.CounterManager(
191
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
192
            "5m": counter.CounterManager(
193
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
194
            "1h": counter.CounterManager(
195
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
196
            "1d": counter.CounterManager(
197
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
198
            "all": counter.CounterManager(
199
                lambda: counter.TotalCounter()),
200
        }
201
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
202
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
203
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
204
        self._last_dump_cnt = 0
205
206
    def _update_projects(self):
207
        '''Check project update'''
208
        now = time.time()
209
        if (
210
                not self._force_update_project
211
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
212
        ):
213
            return
214
        for project in self.projectdb.check_update(self._last_update_project):
215
            self._update_project(project)
216
            logger.debug("project: %s updated.", project['name'])
217
        self._force_update_project = False
218
        self._last_update_project = now
219
220
    get_info_attributes = ['min_tick', 'retry_delay', 'crawl_config']
221
222
    def _update_project(self, project):
223
        '''update one project'''
224
        if project['name'] not in self.projects:
225
            self.projects[project['name']] = Project(self, project)
226
        else:
227
            self.projects[project['name']].update(project)
228
229
        project = self.projects[project['name']]
230
231
        if project._send_on_get_info:
232
            # update project runtime info from processor by sending a _on_get_info
233
            # request, result is in status_page.track.save
234
            project._send_on_get_info = False
235
            self.on_select_task({
236
                'taskid': '_on_get_info',
237
                'project': project.name,
238
                'url': 'data:,_on_get_info',
239
                'status': self.taskdb.SUCCESS,
240
                'fetch': {
241
                    'save': self.get_info_attributes,
242
                },
243
                'process': {
244
                    'callback': '_on_get_info',
245
                },
246
            })
247
248
        # load task queue when project is running and delete task_queue when project is stoped
249
        if project.active:
250
            if not project.task_loaded:
251
                self._load_tasks(project)
252
                project.task_loaded = True
253
        else:
254
            if project.task_loaded:
255
                project.task_queue = TaskQueue()
256
                project.task_loaded = False
257
258
            if project not in self._cnt['all']:
259
                self._update_project_cnt(project.name)
260
261
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
262
263
    def _load_tasks(self, project):
264
        '''load tasks from database'''
265
        task_queue = project.task_queue
266
267
        for task in self.taskdb.load_tasks(
268
                self.taskdb.ACTIVE, project.name, self.scheduler_task_fields
269
        ):
270
            taskid = task['taskid']
271
            _schedule = task.get('schedule', self.default_schedule)
272
            priority = _schedule.get('priority', self.default_schedule['priority'])
273
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
274
            task_queue.put(taskid, priority, exetime)
275
        project.task_loaded = True
276
        logger.debug('project: %s loaded %d tasks.', project.name, len(task_queue))
277
278
        if project not in self._cnt['all']:
279
            self._update_project_cnt(project.name)
280
        self._cnt['all'].value((project.name, 'pending'), len(project.task_queue))
281
282
    def _update_project_cnt(self, project_name):
283
        status_count = self.taskdb.status_count(project_name)
284
        self._cnt['all'].value(
285
            (project_name, 'success'),
286
            status_count.get(self.taskdb.SUCCESS, 0)
287
        )
288
        self._cnt['all'].value(
289
            (project_name, 'failed'),
290
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
291
        )
292
        self._cnt['all'].value(
293
            (project_name, 'pending'),
294
            status_count.get(self.taskdb.ACTIVE, 0)
295
        )
296
297
    def task_verify(self, task):
298
        '''
299
        return False if any of 'taskid', 'project', 'url' is not in task dict
300
                        or project in not in task_queue
301
        '''
302
        for each in ('taskid', 'project', 'url', ):
303
            if each not in task or not task[each]:
304
                logger.error('%s not in task: %.200r', each, task)
305
                return False
306
        if task['project'] not in self.projects:
307
            logger.error('unknown project: %s', task['project'])
308
            return False
309
310
        project = self.projects[task['project']]
311
        if not project.active:
312
            logger.error('project %s not started, please set status to RUNNING or DEBUG',
313
                         task['project'])
314
            return False
315
        return True
316
317
    def insert_task(self, task):
318
        '''insert task into database'''
319
        return self.taskdb.insert(task['project'], task['taskid'], task)
320
321
    def update_task(self, task):
322
        '''update task in database'''
323
        return self.taskdb.update(task['project'], task['taskid'], task)
324
325
    def put_task(self, task):
326
        '''put task to task queue'''
327
        _schedule = task.get('schedule', self.default_schedule)
328
        self.projects[task['project']].task_queue.put(
329
            task['taskid'],
330
            priority=_schedule.get('priority', self.default_schedule['priority']),
331
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
332
        )
333
334
    def send_task(self, task, force=True):
335
        '''
336
        dispatch task to fetcher
337
338
        out queue may have size limit to prevent block, a send_buffer is used
339
        '''
340
        try:
341
            self.out_queue.put_nowait(task)
342
        except Queue.Full:
343
            if force:
344
                self._send_buffer.appendleft(task)
345
            else:
346
                raise
347
348
    def _check_task_done(self):
349
        '''Check status queue'''
350
        cnt = 0
351
        try:
352
            while True:
353
                task = self.status_queue.get_nowait()
354
                # check _on_get_info result here
355
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
356
                    if task['project'] not in self.projects:
357
                        continue
358
                    project = self.projects[task['project']]
359
                    project.on_get_info(task['track'].get('save') or {})
360
                    logger.info(
361
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
362
                    )
363
                    continue
364
                elif not self.task_verify(task):
365
                    continue
366
                self.on_task_status(task)
367
                cnt += 1
368
        except Queue.Empty:
369
            pass
370
        return cnt
371
372
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
373
374
    def _check_request(self):
375
        '''Check new task queue'''
376
        # check _postpone_request first
377
        todo = []
378
        for task in self._postpone_request:
379
            if task['project'] not in self.projects:
380
                continue
381
            if self.projects[task['project']].task_queue.is_processing(task['taskid']):
382
                todo.append(task)
383
            else:
384
                self.on_request(task)
385
        self._postpone_request = todo
386
387
        tasks = {}
388
        while len(tasks) < self.LOOP_LIMIT:
389
            try:
390
                task = self.newtask_queue.get_nowait()
391
            except Queue.Empty:
392
                break
393
394
            if isinstance(task, list):
395
                _tasks = task
396
            else:
397
                _tasks = (task, )
398
399
            for task in _tasks:
400
                if not self.task_verify(task):
401
                    continue
402
403
                if task['taskid'] in self.projects[task['project']].task_queue:
404
                    if not task.get('schedule', {}).get('force_update', False):
405
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
406
                        continue
407
408
                if task['taskid'] in tasks:
409
                    if not task.get('schedule', {}).get('force_update', False):
410
                        continue
411
412
                tasks[task['taskid']] = task
413
414
        for task in itervalues(tasks):
415
            self.on_request(task)
416
417
        return len(tasks)
418
419
    def _check_cronjob(self):
420
        """Check projects cronjob tick, return True when a new tick is sended"""
421
        now = time.time()
422
        self._last_tick = int(self._last_tick)
423
        if now - self._last_tick < 1:
424
            return False
425
        self._last_tick += 1
426
        for project in itervalues(self.projects):
427
            if not project.active:
428
                continue
429
            if project.waiting_get_info:
430
                continue
431
            if int(project.min_tick) == 0:
432
                continue
433
            if self._last_tick % int(project.min_tick) != 0:
434
                continue
435
            self.on_select_task({
436
                'taskid': '_on_cronjob',
437
                'project': project.name,
438
                'url': 'data:,_on_cronjob',
439
                'status': self.taskdb.SUCCESS,
440
                'fetch': {
441
                    'save': {
442
                        'tick': self._last_tick,
443
                    },
444
                },
445
                'process': {
446
                    'callback': '_on_cronjob',
447
                },
448
            })
449
        return True
450
451
    request_task_fields = [
452
        'taskid',
453
        'project',
454
        'url',
455
        'status',
456
        'schedule',
457
        'fetch',
458
        'process',
459
        'track',
460
        'lastcrawltime'
461
    ]
462
463
    def _check_select(self):
464
        '''Select task to fetch & process'''
465
        while self._send_buffer:
466
            _task = self._send_buffer.pop()
467
            try:
468
                # use force=False here to prevent automatic send_buffer append and get exception
469
                self.send_task(_task, False)
470
            except Queue.Full:
471
                self._send_buffer.append(_task)
472
                break
473
474
        if self.out_queue.full():
475
            return {}
476
477
        taskids = []
478
        cnt = 0
479
        cnt_dict = dict()
480
        limit = self.LOOP_LIMIT
481
482
        # dynamic assign select limit for each project, use qsize as weight
483
        project_weights, total_weight = dict(), 0
484
        for project in itervalues(self.projects):  # type:Project
485
            if not project.active:
486
                continue
487
            # only check project pause when select new tasks, cronjob and new request still working
488
            if project.paused:
489
                continue
490
            if project.waiting_get_info:
491
                continue
492
493
            # task queue
494
            task_queue = project.task_queue  # type:TaskQueue
495
            pro_weight = task_queue.size()
496
            total_weight += pro_weight
497
            project_weights[project.name] = pro_weight
498
            pass
499
500
        min_project_limit = int(limit / 10.)  # ensure minimum select limit for each project
501
        max_project_limit = int(limit / 3.0)  # ensure maximum select limit for each project
502
503
        for pro_name, pro_weight in iteritems(project_weights):
504
            if cnt >= limit:
505
                break
506
507
            project = self.projects[pro_name]  # type:Project
508
509
            # task queue
510
            task_queue = project.task_queue
511
            task_queue.check_update()
512
            project_cnt = 0
513
514
            # calculate select limit for project
515
            if total_weight < 1 or pro_weight < 1:
516
                project_limit = min_project_limit
517
            else:
518
                project_limit = int((1.0 * pro_weight / total_weight) * limit)
519
                if project_limit < min_project_limit:
520
                    project_limit = min_project_limit
521
                elif project_limit > max_project_limit:
522
                    project_limit = max_project_limit
523
524
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
525
            while cnt < limit and project_cnt < project_limit:
526
                taskid = task_queue.get()
527
                if not taskid:
528
                    break
529
530
                taskids.append((project.name, taskid))
531
                if taskid != 'on_finished':
532
                    project_cnt += 1
533
                cnt += 1
534
535
            cnt_dict[project.name] = project_cnt
536
            if project_cnt:
537
                project._selected_tasks = True
538
                project._send_finished_event_wait = 0
539
540
            # check and send finished event to project
541
            if not project_cnt and len(task_queue) == 0 and project._selected_tasks:
542
                # wait for self.FAIL_PAUSE_NUM steps to make sure all tasks in queue have been processed
543
                if project._send_finished_event_wait < self.FAIL_PAUSE_NUM:
544
                    project._send_finished_event_wait += 1
545
                else:
546
                    project._selected_tasks = False
547
                    project._send_finished_event_wait = 0
548
549
                    self._postpone_request.append({
550
                        'project': project.name,
551
                        'taskid': 'on_finished',
552
                        'url': 'data:,on_finished',
553
                        'process': {
554
                            'callback': 'on_finished',
555
                        },
556
                        "schedule": {
557
                            "age": 0,
558
                            "priority": 9,
559
                            "force_update": True,
560
                        },
561
                    })
562
563
        for project, taskid in taskids:
564
            self._load_put_task(project, taskid)
565
566
        return cnt_dict
567
568
    def _load_put_task(self, project, taskid):
569
        try:
570
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
571
        except ValueError:
572
            logger.error('bad task pack %s:%s', project, taskid)
573
            return
574
        if not task:
575
            return
576
        task = self.on_select_task(task)
577
578
    def _print_counter_log(self):
579
        # print top 5 active counters
580
        keywords = ('pending', 'success', 'retry', 'failed')
581
        total_cnt = {}
582
        project_actives = []
583
        project_fails = []
584
        for key in keywords:
585
            total_cnt[key] = 0
586
        for project, subcounter in iteritems(self._cnt['5m']):
587
            actives = 0
588
            for key in keywords:
589
                cnt = subcounter.get(key, None)
590
                if cnt:
591
                    cnt = cnt.sum
592
                    total_cnt[key] += cnt
593
                    actives += cnt
594
595
            project_actives.append((actives, project))
596
597
            fails = subcounter.get('failed', None)
598
            if fails:
599
                project_fails.append((fails.sum, project))
600
601
        top_2_fails = sorted(project_fails, reverse=True)[:2]
602
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
603
                               reverse=True)[:5 - len(top_2_fails)]
604
605
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
606
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
607
        for _, project in itertools.chain(top_3_actives, top_2_fails):
608
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
609
            log_str += " %s:%d,%d,%d,%d" % (project,
610
                                            subcounter.get('pending', 0),
611
                                            subcounter.get('success', 0),
612
                                            subcounter.get('retry', 0),
613
                                            subcounter.get('failed', 0))
614
        logger.info(log_str)
615
616
    def _dump_cnt(self):
617
        '''Dump counters to file'''
618
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
619
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
620
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
621
622
    def _try_dump_cnt(self):
623
        '''Dump counters every 60 seconds'''
624
        now = time.time()
625
        if now - self._last_dump_cnt > 60:
626
            self._last_dump_cnt = now
627
            self._dump_cnt()
628
            self._print_counter_log()
629
630
    def _check_delete(self):
631
        '''Check project delete'''
632
        now = time.time()
633
        for project in list(itervalues(self.projects)):
634
            if project.db_status != 'STOP':
635
                continue
636
            if now - project.updatetime < self.DELETE_TIME:
637
                continue
638
            if 'delete' not in self.projectdb.split_group(project.group):
639
                continue
640
641
            logger.warning("deleting project: %s!", project.name)
642
            del self.projects[project.name]
643
            self.taskdb.drop(project.name)
644
            self.projectdb.drop(project.name)
645
            if self.resultdb:
646
                self.resultdb.drop(project.name)
647
            for each in self._cnt.values():
648
                del each[project.name]
649
650
    def __len__(self):
651
        return sum(len(x.task_queue) for x in itervalues(self.projects))
652
653
    def quit(self):
654
        '''Set quit signal'''
655
        self._quit = True
656
        # stop xmlrpc server
657
        if hasattr(self, 'xmlrpc_server'):
658
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
659
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
660
661
    def run_once(self):
662
        '''comsume queues and feed tasks to fetcher, once'''
663
664
        self._update_projects()
665
        self._check_task_done()
666
        self._check_request()
667
        while self._check_cronjob():
668
            pass
669
        self._check_select()
670
        self._check_delete()
671
        self._try_dump_cnt()
672
673
    def run(self):
674
        '''Start scheduler loop'''
675
        logger.info("scheduler starting...")
676
677
        while not self._quit:
678
            try:
679
                time.sleep(self.LOOP_INTERVAL)
680
                self.run_once()
681
                self._exceptions = 0
682
            except KeyboardInterrupt:
683
                break
684
            except Exception as e:
685
                logger.exception(e)
686
                self._exceptions += 1
687
                if self._exceptions > self.EXCEPTION_LIMIT:
688
                    break
689
                continue
690
691
        logger.info("scheduler exiting...")
692
        self._dump_cnt()
693
694
    def trigger_on_start(self, project):
695
        '''trigger an on_start callback of project'''
696
        self.newtask_queue.put({
697
            "project": project,
698
            "taskid": "on_start",
699
            "url": "data:,on_start",
700
            "process": {
701
                "callback": "on_start",
702
            },
703
        })
704
705
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
706
        '''Start xmlrpc interface'''
707
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
708
709
        application = WSGIXMLRPCApplication()
710
711
        application.register_function(self.quit, '_quit')
712
        application.register_function(self.__len__, 'size')
713
714
        def dump_counter(_time, _type):
715
            try:
716
                return self._cnt[_time].to_dict(_type)
717
            except:
718
                logger.exception('')
719
        application.register_function(dump_counter, 'counter')
720
721
        def new_task(task):
722
            if self.task_verify(task):
723
                self.newtask_queue.put(task)
724
                return True
725
            return False
726
        application.register_function(new_task, 'newtask')
727
728
        def send_task(task):
729
            '''dispatch task to fetcher'''
730
            self.send_task(task)
731
            return True
732
        application.register_function(send_task, 'send_task')
733
734
        def update_project():
735
            self._force_update_project = True
736
        application.register_function(update_project, 'update_project')
737
738
        def get_active_tasks(project=None, limit=100):
739
            allowed_keys = set((
740
                'type',
741
                'taskid',
742
                'project',
743
                'status',
744
                'url',
745
                'lastcrawltime',
746
                'updatetime',
747
                'track',
748
            ))
749
            track_allowed_keys = set((
750
                'ok',
751
                'time',
752
                'follows',
753
                'status_code',
754
            ))
755
756
            iters = [iter(x.active_tasks) for k, x in iteritems(self.projects)
757
                     if x and (k == project if project else True)]
758
            tasks = [next(x, None) for x in iters]
759
            result = []
760
761
            while len(result) < limit and tasks and not all(x is None for x in tasks):
762
                updatetime, task = t = max(t for t in tasks if t)
763
                i = tasks.index(t)
764
                tasks[i] = next(iters[i], None)
765
                for key in list(task):
766
                    if key == 'track':
767
                        for k in list(task[key].get('fetch', [])):
768
                            if k not in track_allowed_keys:
769
                                del task[key]['fetch'][k]
770
                        for k in list(task[key].get('process', [])):
771
                            if k not in track_allowed_keys:
772
                                del task[key]['process'][k]
773
                    if key in allowed_keys:
774
                        continue
775
                    del task[key]
776
                result.append(t)
777
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
778
            # have no idea why
779
            return json.loads(json.dumps(result))
780
        application.register_function(get_active_tasks, 'get_active_tasks')
781
782
        def get_projects_pause_status():
783
            result = {}
784
            for project_name, project in iteritems(self.projects):
785
                result[project_name] = project.paused
786
            return result
787
        application.register_function(get_projects_pause_status, 'get_projects_pause_status')
788
789
        def webui_update():
790
            return {
791
                'pause_status': get_projects_pause_status(),
792
                'counter': {
793
                    '5m_time': dump_counter('5m_time', 'avg'),
794
                    '5m': dump_counter('5m', 'sum'),
795
                    '1h': dump_counter('1h', 'sum'),
796
                    '1d': dump_counter('1d', 'sum'),
797
                    'all': dump_counter('all', 'sum'),
798
                },
799
            }
800
        application.register_function(webui_update, 'webui_update')
801
802
        import tornado.wsgi
803
        import tornado.ioloop
804
        import tornado.httpserver
805
806
        container = tornado.wsgi.WSGIContainer(application)
807
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
808
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
809
        self.xmlrpc_server.listen(port=port, address=bind)
810
        logger.info('scheduler.xmlrpc listening on %s:%s', bind, port)
811
        self.xmlrpc_ioloop.start()
812
813
    def on_request(self, task):
814
        if self.INQUEUE_LIMIT and len(self.projects[task['project']].task_queue) >= self.INQUEUE_LIMIT:
815
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
816
            return
817
818
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
819
                                       fields=self.merge_task_fields)
820
        if oldtask:
821
            return self.on_old_request(task, oldtask)
822
        else:
823
            return self.on_new_request(task)
824
825
    def on_new_request(self, task):
826
        '''Called when a new request is arrived'''
827
        task['status'] = self.taskdb.ACTIVE
828
        self.insert_task(task)
829
        self.put_task(task)
830
831
        project = task['project']
832
        self._cnt['5m'].event((project, 'pending'), +1)
833
        self._cnt['1h'].event((project, 'pending'), +1)
834
        self._cnt['1d'].event((project, 'pending'), +1)
835
        self._cnt['all'].event((project, 'pending'), +1)
836
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
837
        return task
838
839
    def on_old_request(self, task, old_task):
840
        '''Called when a crawled task is arrived'''
841
        now = time.time()
842
843
        _schedule = task.get('schedule', self.default_schedule)
844
        old_schedule = old_task.get('schedule', {})
845
846
        if _schedule.get('force_update') and self.projects[task['project']].task_queue.is_processing(task['taskid']):
847
            # when a task is in processing, the modify may conflict with the running task.
848
            # postpone the modify after task finished.
849
            logger.info('postpone modify task %(project)s:%(taskid)s %(url)s', task)
850
            self._postpone_request.append(task)
851
            return
852
853
        restart = False
854
        schedule_age = _schedule.get('age', self.default_schedule['age'])
855
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
856
            restart = True
857
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
858
            restart = True
859
        elif _schedule.get('force_update'):
860
            restart = True
861
862
        if not restart:
863
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
864
            return
865
866
        if _schedule.get('cancel'):
867
            logger.info('cancel task %(project)s:%(taskid)s %(url)s', task)
868
            task['status'] = self.taskdb.BAD
869
            self.update_task(task)
870
            self.projects[task['project']].task_queue.delete(task['taskid'])
871
            return task
872
873
        task['status'] = self.taskdb.ACTIVE
874
        self.update_task(task)
875
        self.put_task(task)
876
877
        project = task['project']
878
        if old_task['status'] != self.taskdb.ACTIVE:
879
            self._cnt['5m'].event((project, 'pending'), +1)
880
            self._cnt['1h'].event((project, 'pending'), +1)
881
            self._cnt['1d'].event((project, 'pending'), +1)
882
        if old_task['status'] == self.taskdb.SUCCESS:
883
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
884
        elif old_task['status'] == self.taskdb.FAILED:
885
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
886
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
887
        return task
888
889
    def on_task_status(self, task):
890
        '''Called when a status pack is arrived'''
891
        try:
892
            procesok = task['track']['process']['ok']
893
            if not self.projects[task['project']].task_queue.done(task['taskid']):
894
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
895
                return None
896
        except KeyError as e:
897
            logger.error("Bad status pack: %s", e)
898
            return None
899
900
        if procesok:
901
            ret = self.on_task_done(task)
902
        else:
903
            ret = self.on_task_failed(task)
904
905
        if task['track']['fetch'].get('time'):
906
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
907
                                       task['track']['fetch']['time'])
908
        if task['track']['process'].get('time'):
909
            self._cnt['5m_time'].event((task['project'], 'process_time'),
910
                                       task['track']['process'].get('time'))
911
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
912
        return ret
913
914
    def on_task_done(self, task):
915
        '''Called when a task is done and success, called by `on_task_status`'''
916
        task['status'] = self.taskdb.SUCCESS
917
        task['lastcrawltime'] = time.time()
918
919
        if 'schedule' in task:
920
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
921
                task['status'] = self.taskdb.ACTIVE
922
                next_exetime = task['schedule'].get('age')
923
                task['schedule']['exetime'] = time.time() + next_exetime
924
                self.put_task(task)
925
            else:
926
                del task['schedule']
927
        self.update_task(task)
928
929
        project = task['project']
930
        self._cnt['5m'].event((project, 'success'), +1)
931
        self._cnt['1h'].event((project, 'success'), +1)
932
        self._cnt['1d'].event((project, 'success'), +1)
933
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
934
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
935
        return task
936
937
    def on_task_failed(self, task):
938
        '''Called when a task is failed, called by `on_task_status`'''
939
940
        if 'schedule' not in task:
941
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
942
            if old_task is None:
943
                logging.error('unknown status pack: %s' % task)
944
                return
945
            task['schedule'] = old_task.get('schedule', {})
946
947
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
948
        retried = task['schedule'].get('retried', 0)
949
950
        project_info = self.projects[task['project']]
951
        retry_delay = project_info.retry_delay or self.DEFAULT_RETRY_DELAY
952
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
953
954
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
955
            next_exetime = min(next_exetime, task['schedule'].get('age'))
956
        else:
957
            if retried >= retries:
958
                next_exetime = -1
959
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
960
                next_exetime = task['schedule'].get('age')
961
962
        if next_exetime < 0:
963
            task['status'] = self.taskdb.FAILED
964
            task['lastcrawltime'] = time.time()
965
            self.update_task(task)
966
967
            project = task['project']
968
            self._cnt['5m'].event((project, 'failed'), +1)
969
            self._cnt['1h'].event((project, 'failed'), +1)
970
            self._cnt['1d'].event((project, 'failed'), +1)
971
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
972
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
973
            return task
974
        else:
975
            task['schedule']['retried'] = retried + 1
976
            task['schedule']['exetime'] = time.time() + next_exetime
977
            task['lastcrawltime'] = time.time()
978
            self.update_task(task)
979
            self.put_task(task)
980
981
            project = task['project']
982
            self._cnt['5m'].event((project, 'retry'), +1)
983
            self._cnt['1h'].event((project, 'retry'), +1)
984
            self._cnt['1d'].event((project, 'retry'), +1)
985
            # self._cnt['all'].event((project, 'retry'), +1)
986
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
987
                retried, retries), task)
988
            return task
989
990
    def on_select_task(self, task):
991
        '''Called when a task is selected to fetch & process'''
992
        # inject informations about project
993
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
994
995
        project_info = self.projects.get(task['project'])
996
        assert project_info, 'no such project'
997
        task['type'] = self.TASK_PACK
998
        task['group'] = project_info.group
999
        task['project_md5sum'] = project_info.md5sum
1000
        task['project_updatetime'] = project_info.updatetime
1001
1002
        # lazy join project.crawl_config
1003
        if getattr(project_info, 'crawl_config', None):
1004
            task = BaseHandler.task_join_crawl_config(task, project_info.crawl_config)
1005
1006
        project_info.active_tasks.appendleft((time.time(), task))
1007
        self.send_task(task)
1008
        return task
1009
1010
1011
from tornado import gen
1012
1013
1014
class OneScheduler(Scheduler):
1015
    """
1016
    Scheduler Mixin class for one mode
1017
1018
    overwirted send_task method
1019
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
1020
    """
1021
1022
    def _check_select(self):
1023
        """
1024
        interactive mode of select tasks
1025
        """
1026
        if not self.interactive:
1027
            return super(OneScheduler, self)._check_select()
1028
1029
        # waiting for running tasks
1030
        if self.running_task > 0:
1031
            return
1032
1033
        is_crawled = []
1034
1035
        def run(project=None):
1036
            return crawl('on_start', project=project)
1037
1038
        def crawl(url, project=None, **kwargs):
1039
            """
1040
            Crawl given url, same parameters as BaseHandler.crawl
1041
1042
            url - url or taskid, parameters will be used if in taskdb
1043
            project - can be ignored if only one project exists.
1044
            """
1045
1046
            # looking up the project instance
1047
            if project is None:
1048
                if len(self.projects) == 1:
1049
                    project = list(self.projects.keys())[0]
1050
                else:
1051
                    raise LookupError('You need specify the project: %r'
1052
                                      % list(self.projects.keys()))
1053
            project_data = self.processor.project_manager.get(project)
1054
            if not project_data:
1055
                raise LookupError('no such project: %s' % project)
1056
1057
            # get task package
1058
            instance = project_data['instance']
1059
            instance._reset()
1060
            task = instance.crawl(url, **kwargs)
1061
            if isinstance(task, list):
1062
                raise Exception('url list is not allowed in interactive mode')
1063
1064
            # check task in taskdb
1065
            if not kwargs:
1066
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
1067
                                              fields=self.request_task_fields)
1068
                if not dbtask:
1069
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
1070
                                                  fields=self.request_task_fields)
1071
                if dbtask:
1072
                    task = dbtask
1073
1074
            # select the task
1075
            self.on_select_task(task)
1076
            is_crawled.append(True)
1077
1078
            shell.ask_exit()
1079
1080
        def quit_interactive():
1081
            '''Quit interactive mode'''
1082
            is_crawled.append(True)
1083
            self.interactive = False
1084
            shell.ask_exit()
1085
1086
        def quit_pyspider():
1087
            '''Close pyspider'''
1088
            is_crawled[:] = []
1089
            shell.ask_exit()
1090
1091
        shell = utils.get_python_console()
1092
        banner = (
1093
            'pyspider shell - Select task\n'
1094
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
1095
            'quit_interactive() - Quit interactive mode\n'
1096
            'quit_pyspider() - Close pyspider'
1097
        )
1098
        if hasattr(shell, 'show_banner'):
1099
            shell.show_banner(banner)
1100
            shell.interact()
1101
        else:
1102
            shell.interact(banner)
1103
        if not is_crawled:
1104
            self.ioloop.add_callback(self.ioloop.stop)
1105
1106
    def __getattr__(self, name):
1107
        """patch for crawl(url, callback=self.index_page) API"""
1108
        if self.interactive:
1109
            return name
1110
        raise AttributeError(name)
1111
1112
    def on_task_status(self, task):
1113
        """Ignore not processing error in interactive mode"""
1114
        if not self.interactive:
1115
            super(OneScheduler, self).on_task_status(task)
1116
1117
        try:
1118
            procesok = task['track']['process']['ok']
1119
        except KeyError as e:
1120
            logger.error("Bad status pack: %s", e)
1121
            return None
1122
1123
        if procesok:
1124
            ret = self.on_task_done(task)
1125
        else:
1126
            ret = self.on_task_failed(task)
1127
        if task['track']['fetch'].get('time'):
1128
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
1129
                                       task['track']['fetch']['time'])
1130
        if task['track']['process'].get('time'):
1131
            self._cnt['5m_time'].event((task['project'], 'process_time'),
1132
                                       task['track']['process'].get('time'))
1133
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
1134
        return ret
1135
1136
    def init_one(self, ioloop, fetcher, processor,
1137
                 result_worker=None, interactive=False):
1138
        self.ioloop = ioloop
1139
        self.fetcher = fetcher
1140
        self.processor = processor
1141
        self.result_worker = result_worker
1142
        self.interactive = interactive
1143
        self.running_task = 0
1144
1145
    @gen.coroutine
1146
    def do_task(self, task):
1147
        self.running_task += 1
1148
        result = yield gen.Task(self.fetcher.fetch, task)
1149
        type, task, response = result.args
1150
        self.processor.on_task(task, response)
1151
        # do with message
1152
        while not self.processor.inqueue.empty():
1153
            _task, _response = self.processor.inqueue.get()
1154
            self.processor.on_task(_task, _response)
1155
        # do with results
1156
        while not self.processor.result_queue.empty():
1157
            _task, _result = self.processor.result_queue.get()
1158
            if self.result_worker:
1159
                self.result_worker.on_result(_task, _result)
1160
        self.running_task -= 1
1161
1162
    def send_task(self, task, force=True):
1163
        if self.fetcher.http_client.free_size() <= 0:
1164
            if force:
1165
                self._send_buffer.appendleft(task)
1166
            else:
1167
                raise self.outqueue.Full
1168
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
1169
1170
    def run(self):
1171
        import tornado.ioloop
1172
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
1173
                                        io_loop=self.ioloop).start()
1174
        self.ioloop.start()
1175
1176
    def quit(self):
1177
        self.ioloop.stop()
1178
        logger.info("scheduler exiting...")
1179
1180
1181
import random
1182
import threading
1183
from pyspider.database.sqlite.sqlitebase import SQLiteMixin
1184
1185
1186
class ThreadBaseScheduler(Scheduler):
1187
    def __init__(self, threads=4, *args, **kwargs):
1188
        self.local = threading.local()
1189
1190
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
1191
1192
        if isinstance(self.taskdb, SQLiteMixin):
1193
            self.threads = 1
1194
        else:
1195
            self.threads = threads
1196
1197
        self._taskdb = self.taskdb
1198
        self._projectdb = self.projectdb
1199
        self._resultdb = self.resultdb
1200
1201
        self.thread_objs = []
1202
        self.thread_queues = []
1203
        self._start_threads()
1204
        assert len(self.thread_queues) > 0
1205
1206
    @property
1207
    def taskdb(self):
1208
        if not hasattr(self.local, 'taskdb'):
1209
            self.taskdb = self._taskdb.copy()
1210
        return self.local.taskdb
1211
1212
    @taskdb.setter
1213
    def taskdb(self, taskdb):
1214
        self.local.taskdb = taskdb
1215
1216
    @property
1217
    def projectdb(self):
1218
        if not hasattr(self.local, 'projectdb'):
1219
            self.projectdb = self._projectdb.copy()
1220
        return self.local.projectdb
1221
1222
    @projectdb.setter
1223
    def projectdb(self, projectdb):
1224
        self.local.projectdb = projectdb
1225
1226
    @property
1227
    def resultdb(self):
1228
        if not hasattr(self.local, 'resultdb'):
1229
            self.resultdb = self._resultdb.copy()
1230
        return self.local.resultdb
1231
1232
    @resultdb.setter
1233
    def resultdb(self, resultdb):
1234
        self.local.resultdb = resultdb
1235
1236
    def _start_threads(self):
1237
        for i in range(self.threads):
1238
            queue = Queue.Queue()
1239
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
1240
            thread.daemon = True
1241
            thread.start()
1242
            self.thread_objs.append(thread)
1243
            self.thread_queues.append(queue)
1244
1245
    def _thread_worker(self, queue):
1246
        while True:
1247
            method, args, kwargs = queue.get()
1248
            try:
1249
                method(*args, **kwargs)
1250
            except Exception as e:
1251
                logger.exception(e)
1252
1253
    def _run_in_thread(self, method, *args, **kwargs):
1254
        i = kwargs.pop('_i', None)
1255
        block = kwargs.pop('_block', False)
1256
1257
        if i is None:
1258
            while True:
1259
                for queue in self.thread_queues:
1260
                    if queue.empty():
1261
                        break
1262
                else:
1263
                    if block:
1264
                        time.sleep(0.1)
1265
                        continue
1266
                    else:
1267
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1268
                break
1269
        else:
1270
            queue = self.thread_queues[i % len(self.thread_queues)]
1271
1272
        queue.put((method, args, kwargs))
1273
1274
        if block:
1275
            self._wait_thread()
1276
1277
    def _wait_thread(self):
1278
        while True:
1279
            if all(queue.empty() for queue in self.thread_queues):
1280
                break
1281
            time.sleep(0.1)
1282
1283
    def _update_project(self, project):
1284
        self._run_in_thread(Scheduler._update_project, self, project)
1285
1286
    def on_task_status(self, task):
1287
        i = hash(task['taskid'])
1288
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1289
1290
    def on_request(self, task):
1291
        i = hash(task['taskid'])
1292
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1293
1294
    def _load_put_task(self, project, taskid):
1295
        i = hash(taskid)
1296
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1297
1298
    def run_once(self):
1299
        super(ThreadBaseScheduler, self).run_once()
1300
        self._wait_thread()
1301