Completed
Push — master ( 8f71e0...468202 )
by Roy
01:20
created

pyspider.scheduler.Scheduler.on_task_failed()   D

Complexity

Conditions 9

Size

Total Lines 52

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 9
dl 0
loc 52
rs 4.6097

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import os
10
import json
11
import time
12
import logging
13
import itertools
14
from collections import deque
15
16
from six import iteritems, itervalues
17
18
from pyspider.libs import counter, utils
19
from six.moves import queue as Queue
20
from .task_queue import TaskQueue
21
logger = logging.getLogger('scheduler')
22
23
24
class Scheduler(object):
25
    UPDATE_PROJECT_INTERVAL = 5 * 60
26
    default_schedule = {
27
        'priority': 0,
28
        'retries': 3,
29
        'exetime': 0,
30
        'age': -1,
31
        'itag': None,
32
    }
33
    LOOP_LIMIT = 1000
34
    LOOP_INTERVAL = 0.1
35
    ACTIVE_TASKS = 100
36
    INQUEUE_LIMIT = 0
37
    EXCEPTION_LIMIT = 3
38
    DELETE_TIME = 24 * 60 * 60
39
    DEFAULT_RETRY_DELAY = {
40
        0: 30,
41
        1: 1*60*60,
42
        2: 6*60*60,
43
        3: 12*60*60,
44
        '': 24*60*60
45
    }
46
47
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
48
                 out_queue, data_path='./data', resultdb=None):
49
        self.taskdb = taskdb
50
        self.projectdb = projectdb
51
        self.resultdb = resultdb
52
        self.newtask_queue = newtask_queue
53
        self.status_queue = status_queue
54
        self.out_queue = out_queue
55
        self.data_path = data_path
56
57
        self._send_buffer = deque()
58
        self._quit = False
59
        self._exceptions = 0
60
        self.projects = dict()
61
        self._force_update_project = False
62
        self._last_update_project = 0
63
        self.task_queue = dict()
64
        self._last_tick = int(time.time())
65
66
        self._cnt = {
67
            "5m_time": counter.CounterManager(
68
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
69
            "5m": counter.CounterManager(
70
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
71
            "1h": counter.CounterManager(
72
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
73
            "1d": counter.CounterManager(
74
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
75
            "all": counter.CounterManager(
76
                lambda: counter.TotalCounter()),
77
        }
78
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
79
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
80
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
81
        self._last_dump_cnt = 0
82
83
    def _update_projects(self):
84
        '''Check project update'''
85
        now = time.time()
86
        if (
87
                not self._force_update_project
88
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
89
        ):
90
            return
91
        for project in self.projectdb.check_update(self._last_update_project):
92
            self._update_project(project)
93
            logger.debug("project: %s updated.", project['name'])
94
        self._force_update_project = False
95
        self._last_update_project = now
96
97
    def _update_project(self, project):
98
        '''update one project'''
99
        if project['name'] not in self.projects:
100
            self.projects[project['name']] = {}
101
        self.projects[project['name']].update(project)
102
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
103
        if not self.projects[project['name']].get('active_tasks', None):
104
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
105
106
        # load task queue when project is running and delete task_queue when project is stoped
107
        if project['status'] in ('RUNNING', 'DEBUG'):
108
            if project['name'] not in self.task_queue:
109
                self._load_tasks(project['name'])
110
            self.task_queue[project['name']].rate = project['rate']
111
            self.task_queue[project['name']].burst = project['burst']
112
113
            # update project runtime info from processor by sending a _on_get_info
114
            # request, result is in status_page.track.save
115
            self.on_select_task({
116
                'taskid': '_on_get_info',
117
                'project': project['name'],
118
                'url': 'data:,_on_get_info',
119
                'status': self.taskdb.SUCCESS,
120
                'fetch': {
121
                    'save': ['min_tick', 'retry_delay'],
122
                },
123
                'process': {
124
                    'callback': '_on_get_info',
125
                },
126
            })
127
        else:
128
            if project['name'] in self.task_queue:
129
                self.task_queue[project['name']].rate = 0
130
                self.task_queue[project['name']].burst = 0
131
                del self.task_queue[project['name']]
132
133
            if project not in self._cnt['all']:
134
                self._update_project_cnt(project['name'])
135
136
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
137
138
    def _load_tasks(self, project):
139
        '''load tasks from database'''
140
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
141
        for task in self.taskdb.load_tasks(
142
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
143
        ):
144
            taskid = task['taskid']
145
            _schedule = task.get('schedule', self.default_schedule)
146
            priority = _schedule.get('priority', self.default_schedule['priority'])
147
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
148
            self.task_queue[project].put(taskid, priority, exetime)
149
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
150
151
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
152
            self.task_queue[project].rate = self.projects[project]['rate']
153
            self.task_queue[project].burst = self.projects[project]['burst']
154
        else:
155
            self.task_queue[project].rate = 0
156
            self.task_queue[project].burst = 0
157
158
        if project not in self._cnt['all']:
159
            self._update_project_cnt(project)
160
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
161
162
    def _update_project_cnt(self, project):
163
        status_count = self.taskdb.status_count(project)
164
        self._cnt['all'].value(
165
            (project, 'success'),
166
            status_count.get(self.taskdb.SUCCESS, 0)
167
        )
168
        self._cnt['all'].value(
169
            (project, 'failed'),
170
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
171
        )
172
        self._cnt['all'].value(
173
            (project, 'pending'),
174
            status_count.get(self.taskdb.ACTIVE, 0)
175
        )
176
177
    def task_verify(self, task):
178
        '''
179
        return False if any of 'taskid', 'project', 'url' is not in task dict
180
                        or project in not in task_queue
181
        '''
182
        for each in ('taskid', 'project', 'url', ):
183
            if each not in task or not task[each]:
184
                logger.error('%s not in task: %.200r', each, task)
185
                return False
186
        if task['project'] not in self.task_queue:
187
            logger.error('unknown project: %s', task['project'])
188
            return False
189
        return True
190
191
    def insert_task(self, task):
192
        '''insert task into database'''
193
        return self.taskdb.insert(task['project'], task['taskid'], task)
194
195
    def update_task(self, task):
196
        '''update task in database'''
197
        return self.taskdb.update(task['project'], task['taskid'], task)
198
199
    def put_task(self, task):
200
        '''put task to task queue'''
201
        _schedule = task.get('schedule', self.default_schedule)
202
        self.task_queue[task['project']].put(
203
            task['taskid'],
204
            priority=_schedule.get('priority', self.default_schedule['priority']),
205
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
206
        )
207
208
    def send_task(self, task, force=True):
209
        '''
210
        dispatch task to fetcher
211
212
        out queue may have size limit to prevent block, a send_buffer is used
213
        '''
214
        try:
215
            self.out_queue.put_nowait(task)
216
        except Queue.Full:
217
            if force:
218
                self._send_buffer.appendleft(task)
219
            else:
220
                raise
221
222
    def _check_task_done(self):
223
        '''Check status queue'''
224
        cnt = 0
225
        try:
226
            while True:
227
                task = self.status_queue.get_nowait()
228
                # check _on_get_info result here
229
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
230
                    self.projects[task['project']].update(task['track'].get('save') or {})
231
                    logger.info(
232
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
233
                    )
234
                    continue
235
                elif not self.task_verify(task):
236
                    continue
237
                self.on_task_status(task)
238
                cnt += 1
239
        except Queue.Empty:
240
            pass
241
        return cnt
242
243
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
244
245
    def _check_request(self):
246
        '''Check new task queue'''
247
        tasks = {}
248
        while len(tasks) < self.LOOP_LIMIT:
249
            try:
250
                task = self.newtask_queue.get_nowait()
251
            except Queue.Empty:
252
                break
253
254
            if isinstance(task, list):
255
                _tasks = task
256
            else:
257
                _tasks = (task, )
258
259
            for task in _tasks:
260
                if not self.task_verify(task):
261
                    continue
262
263
                if task['taskid'] in self.task_queue[task['project']]:
264
                    if not task.get('schedule', {}).get('force_update', False):
265
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
266
                        continue
267
268
                if task['taskid'] in tasks:
269
                    if not task.get('schedule', {}).get('force_update', False):
270
                        continue
271
272
                tasks[task['taskid']] = task
273
274
        for task in itervalues(tasks):
275
            self.on_request(task)
276
277
        return len(tasks)
278
279
    def _check_cronjob(self):
280
        """Check projects cronjob tick, return True when a new tick is sended"""
281
        now = time.time()
282
        self._last_tick = int(self._last_tick)
283
        if now - self._last_tick < 1:
284
            return False
285
        self._last_tick += 1
286
        for project in itervalues(self.projects):
287
            if project['status'] not in ('DEBUG', 'RUNNING'):
288
                continue
289
            if project.get('min_tick', 0) == 0:
290
                continue
291
            if self._last_tick % int(project['min_tick']) != 0:
292
                continue
293
            self.on_select_task({
294
                'taskid': '_on_cronjob',
295
                'project': project['name'],
296
                'url': 'data:,_on_cronjob',
297
                'status': self.taskdb.SUCCESS,
298
                'fetch': {
299
                    'save': {
300
                        'tick': self._last_tick,
301
                    },
302
                },
303
                'process': {
304
                    'callback': '_on_cronjob',
305
                },
306
            })
307
        return True
308
309
    request_task_fields = [
310
        'taskid',
311
        'project',
312
        'url',
313
        'status',
314
        'schedule',
315
        'fetch',
316
        'process',
317
        'track',
318
        'lastcrawltime'
319
    ]
320
321
    def _check_select(self):
322
        '''Select task to fetch & process'''
323
        while self._send_buffer:
324
            _task = self._send_buffer.pop()
325
            try:
326
                # use force=False here to prevent automatic send_buffer append and get exception
327
                self.send_task(_task, False)
328
            except Queue.Full:
329
                self._send_buffer.append(_task)
330
                break
331
332
        if self.out_queue.full():
333
            return {}
334
335
        taskids = []
336
        cnt = 0
337
        cnt_dict = dict()
338
        limit = self.LOOP_LIMIT
339
        for project, task_queue in iteritems(self.task_queue):
340
            if cnt >= limit:
341
                break
342
343
            # task queue
344
            self.task_queue[project].check_update()
345
            project_cnt = 0
346
347
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
348
            while cnt < limit and project_cnt < limit / 10:
349
                taskid = task_queue.get()
350
                if not taskid:
351
                    break
352
353
                taskids.append((project, taskid))
354
                project_cnt += 1
355
                cnt += 1
356
            cnt_dict[project] = project_cnt
357
358
        for project, taskid in taskids:
359
            self._load_put_task(project, taskid)
360
361
        return cnt_dict
362
363
    def _load_put_task(self, project, taskid):
364
        task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
365
        if not task:
366
            return
367
        task = self.on_select_task(task)
368
369
    def _print_counter_log(self):
370
        # print top 5 active counters
371
        keywords = ('pending', 'success', 'retry', 'failed')
372
        total_cnt = {}
373
        project_actives = []
374
        project_fails = []
375
        for key in keywords:
376
            total_cnt[key] = 0
377
        for project, subcounter in iteritems(self._cnt['5m']):
378
            actives = 0
379
            for key in keywords:
380
                cnt = subcounter.get(key, None)
381
                if cnt:
382
                    cnt = cnt.sum
383
                    total_cnt[key] += cnt
384
                    actives += cnt
385
386
            project_actives.append((actives, project))
387
388
            fails = subcounter.get('failed', None)
389
            if fails:
390
                project_fails.append((fails.sum, project))
391
392
        top_2_fails = sorted(project_fails, reverse=True)[:2]
393
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
394
                               reverse=True)[:5 - len(top_2_fails)]
395
396
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
397
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
398
        for _, project in itertools.chain(top_3_actives, top_2_fails):
399
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
400
            log_str += " %s:%d,%d,%d,%d" % (project,
401
                                            subcounter.get('pending', 0),
402
                                            subcounter.get('success', 0),
403
                                            subcounter.get('retry', 0),
404
                                            subcounter.get('failed', 0))
405
        logger.info(log_str)
406
407
    def _dump_cnt(self):
408
        '''Dump counters to file'''
409
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
410
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
411
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
412
413
    def _try_dump_cnt(self):
414
        '''Dump counters every 60 seconds'''
415
        now = time.time()
416
        if now - self._last_dump_cnt > 60:
417
            self._last_dump_cnt = now
418
            self._dump_cnt()
419
            self._print_counter_log()
420
421
    def _check_delete(self):
422
        '''Check project delete'''
423
        now = time.time()
424
        for project in list(itervalues(self.projects)):
425
            if project['status'] != 'STOP':
426
                continue
427
            if now - project['updatetime'] < self.DELETE_TIME:
428
                continue
429
            if 'delete' not in self.projectdb.split_group(project['group']):
430
                continue
431
432
            logger.warning("deleting project: %s!", project['name'])
433
            if project['name'] in self.task_queue:
434
                self.task_queue[project['name']].rate = 0
435
                self.task_queue[project['name']].burst = 0
436
                del self.task_queue[project['name']]
437
            del self.projects[project['name']]
438
            self.taskdb.drop(project['name'])
439
            self.projectdb.drop(project['name'])
440
            if self.resultdb:
441
                self.resultdb.drop(project['name'])
442
            for each in self._cnt.values():
443
                del each[project['name']]
444
445
    def __len__(self):
446
        return sum(len(x) for x in itervalues(self.task_queue))
447
448
    def quit(self):
449
        '''Set quit signal'''
450
        self._quit = True
451
452
    def run_once(self):
453
        '''comsume queues and feed tasks to fetcher, once'''
454
455
        self._update_projects()
456
        self._check_task_done()
457
        self._check_request()
458
        while self._check_cronjob():
459
            pass
460
        self._check_select()
461
        self._check_delete()
462
        self._try_dump_cnt()
463
464
    def run(self):
465
        '''Start scheduler loop'''
466
        logger.info("loading projects")
467
468
        while not self._quit:
469
            try:
470
                time.sleep(self.LOOP_INTERVAL)
471
                self.run_once()
472
                self._exceptions = 0
473
            except KeyboardInterrupt:
474
                break
475
            except Exception as e:
476
                logger.exception(e)
477
                self._exceptions += 1
478
                if self._exceptions > self.EXCEPTION_LIMIT:
479
                    break
480
                continue
481
482
        logger.info("scheduler exiting...")
483
        self._dump_cnt()
484
485
    def trigger_on_start(self, project):
486
        '''trigger an on_start callback of project'''
487
        self.newtask_queue.put({
488
            "project": project,
489
            "taskid": "on_start",
490
            "url": "data:,on_start",
491
            "process": {
492
                "callback": "on_start",
493
            },
494
        })
495
496
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
497
        '''Start xmlrpc interface'''
498
        try:
499
            from six.moves.xmlrpc_server import SimpleXMLRPCServer
500
        except ImportError:
501
            from SimpleXMLRPCServer import SimpleXMLRPCServer
502
503
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
504
        server.register_introspection_functions()
505
        server.register_multicall_functions()
506
507
        server.register_function(self.quit, '_quit')
508
        server.register_function(self.__len__, 'size')
509
510
        def dump_counter(_time, _type):
511
            try:
512
                return self._cnt[_time].to_dict(_type)
513
            except:
514
                logger.exception('')
515
        server.register_function(dump_counter, 'counter')
516
517
        def new_task(task):
518
            if self.task_verify(task):
519
                self.newtask_queue.put(task)
520
                return True
521
            return False
522
        server.register_function(new_task, 'newtask')
523
524
        def send_task(task):
525
            '''dispatch task to fetcher'''
526
            self.send_task(task)
527
            return True
528
        server.register_function(send_task, 'send_task')
529
530
        def update_project():
531
            self._force_update_project = True
532
        server.register_function(update_project, 'update_project')
533
534
        def get_active_tasks(project=None, limit=100):
535
            allowed_keys = set((
536
                'taskid',
537
                'project',
538
                'status',
539
                'url',
540
                'lastcrawltime',
541
                'updatetime',
542
                'track',
543
            ))
544
            track_allowed_keys = set((
545
                'ok',
546
                'time',
547
                'follows',
548
                'status_code',
549
            ))
550
551
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
552
                     if x and (k == project if project else True)]
553
            tasks = [next(x, None) for x in iters]
554
            result = []
555
556
            while len(result) < limit and tasks and not all(x is None for x in tasks):
557
                updatetime, task = t = max(t for t in tasks if t)
558
                i = tasks.index(t)
559
                tasks[i] = next(iters[i], None)
560
                for key in list(task):
561
                    if key == 'track':
562
                        for k in list(task[key].get('fetch', [])):
563
                            if k not in track_allowed_keys:
564
                                del task[key]['fetch'][k]
565
                        for k in list(task[key].get('process', [])):
566
                            if k not in track_allowed_keys:
567
                                del task[key]['process'][k]
568
                    if key in allowed_keys:
569
                        continue
570
                    del task[key]
571
                result.append(t)
572
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
573
            # have no idea why
574
            return json.loads(json.dumps(result))
575
        server.register_function(get_active_tasks, 'get_active_tasks')
576
577
        server.timeout = 0.5
578
        while not self._quit:
579
            server.handle_request()
580
        server.server_close()
581
582
    def on_request(self, task):
583
        if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
584
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
585
            return
586
587
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
588
                                       fields=self.merge_task_fields)
589
        if oldtask:
590
            return self.on_old_request(task, oldtask)
591
        else:
592
            return self.on_new_request(task)
593
594
    def on_new_request(self, task):
595
        '''Called when a new request is arrived'''
596
        task['status'] = self.taskdb.ACTIVE
597
        self.insert_task(task)
598
        self.put_task(task)
599
600
        project = task['project']
601
        self._cnt['5m'].event((project, 'pending'), +1)
602
        self._cnt['1h'].event((project, 'pending'), +1)
603
        self._cnt['1d'].event((project, 'pending'), +1)
604
        self._cnt['all'].event((project, 'pending'), +1)
605
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
606
        return task
607
608
    def on_old_request(self, task, old_task):
609
        '''Called when a crawled task is arrived'''
610
        now = time.time()
611
612
        _schedule = task.get('schedule', self.default_schedule)
613
        old_schedule = old_task.get('schedule', {})
614
615
        restart = False
616
        schedule_age = _schedule.get('age', self.default_schedule['age'])
617
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
618
            restart = True
619
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
620
            restart = True
621
        elif _schedule.get('force_update'):
622
            restart = True
623
624
        if not restart:
625
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
626
            return
627
628
        task['status'] = self.taskdb.ACTIVE
629
        self.update_task(task)
630
        self.put_task(task)
631
632
        project = task['project']
633
        if old_task['status'] != self.taskdb.ACTIVE:
634
            self._cnt['5m'].event((project, 'pending'), +1)
635
            self._cnt['1h'].event((project, 'pending'), +1)
636
            self._cnt['1d'].event((project, 'pending'), +1)
637
        if old_task['status'] == self.taskdb.SUCCESS:
638
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
639
        elif old_task['status'] == self.taskdb.FAILED:
640
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
641
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
642
        return task
643
644
    def on_task_status(self, task):
645
        '''Called when a status pack is arrived'''
646
        try:
647
            procesok = task['track']['process']['ok']
648
            if not self.task_queue[task['project']].done(task['taskid']):
649
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
650
                return None
651
        except KeyError as e:
652
            logger.error("Bad status pack: %s", e)
653
            return None
654
655
        if procesok:
656
            ret = self.on_task_done(task)
657
        else:
658
            ret = self.on_task_failed(task)
659
660
        if task['track']['fetch'].get('time'):
661
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
662
                                       task['track']['fetch']['time'])
663
        if task['track']['process'].get('time'):
664
            self._cnt['5m_time'].event((task['project'], 'process_time'),
665
                                       task['track']['process'].get('time'))
666
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
667
        return ret
668
669
    def on_task_done(self, task):
670
        '''Called when a task is done and success, called by `on_task_status`'''
671
        task['status'] = self.taskdb.SUCCESS
672
        task['lastcrawltime'] = time.time()
673
674
        if 'schedule' in task:
675
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
676
                task['status'] = self.taskdb.ACTIVE
677
                next_exetime = task['schedule'].get('age')
678
                task['schedule']['exetime'] = time.time() + next_exetime
679
                self.put_task(task)
680
            else:
681
                del task['schedule']
682
        self.update_task(task)
683
684
        project = task['project']
685
        self._cnt['5m'].event((project, 'success'), +1)
686
        self._cnt['1h'].event((project, 'success'), +1)
687
        self._cnt['1d'].event((project, 'success'), +1)
688
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
689
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
690
        return task
691
692
    def on_task_failed(self, task):
693
        '''Called when a task is failed, called by `on_task_status`'''
694
695
        if 'schedule' not in task:
696
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
697
            if old_task is None:
698
                logging.error('unknown status pack: %s' % task)
699
                return
700
            task['schedule'] = old_task.get('schedule', {})
701
702
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
703
        retried = task['schedule'].get('retried', 0)
704
705
        project_info = self.projects.get(task['project'], {})
706
        retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY
707
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
708
709
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
710
            next_exetime = min(next_exetime, task['schedule'].get('age'))
711
        else:
712
            if retried >= retries:
713
                next_exetime = -1
714
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
715
                next_exetime = task['schedule'].get('age')
716
717
        if next_exetime < 0:
718
            task['status'] = self.taskdb.FAILED
719
            task['lastcrawltime'] = time.time()
720
            self.update_task(task)
721
722
            project = task['project']
723
            self._cnt['5m'].event((project, 'failed'), +1)
724
            self._cnt['1h'].event((project, 'failed'), +1)
725
            self._cnt['1d'].event((project, 'failed'), +1)
726
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
727
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
728
            return task
729
        else:
730
            task['schedule']['retried'] = retried + 1
731
            task['schedule']['exetime'] = time.time() + next_exetime
732
            task['lastcrawltime'] = time.time()
733
            self.update_task(task)
734
            self.put_task(task)
735
736
            project = task['project']
737
            self._cnt['5m'].event((project, 'retry'), +1)
738
            self._cnt['1h'].event((project, 'retry'), +1)
739
            self._cnt['1d'].event((project, 'retry'), +1)
740
            # self._cnt['all'].event((project, 'retry'), +1)
741
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
742
                retried, retries), task)
743
            return task
744
745
    def on_select_task(self, task):
746
        '''Called when a task is selected to fetch & process'''
747
        # inject informations about project
748
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
749
750
        project_info = self.projects.get(task['project'])
751
        assert project_info, 'no such project'
752
        task['group'] = project_info.get('group')
753
        task['project_md5sum'] = project_info.get('md5sum')
754
        task['project_updatetime'] = project_info.get('updatetime', 0)
755
        project_info['active_tasks'].appendleft((time.time(), task))
756
        self.send_task(task)
757
        return task
758
759
760
from tornado import gen
761
762
763
class OneScheduler(Scheduler):
764
    """
765
    Scheduler Mixin class for one mode
766
767
    overwirted send_task method
768
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
769
    """
770
771
    def _check_select(self):
772
        """
773
        interactive mode of select tasks
774
        """
775
        if not self.interactive:
776
            return super(OneScheduler, self)._check_select()
777
778
        # waiting for running tasks
779
        if self.running_task > 0:
780
            return
781
782
        is_crawled = []
783
784
        def run(project=None):
785
            return crawl('on_start', project=project)
786
787
        def crawl(url, project=None, **kwargs):
788
            """
789
            Crawl given url, same parameters as BaseHandler.crawl
790
791
            url - url or taskid, parameters will be used if in taskdb
792
            project - can be ignored if only one project exists.
793
            """
794
795
            # looking up the project instance
796
            if project is None:
797
                if len(self.projects) == 1:
798
                    project = list(self.projects.keys())[0]
799
                else:
800
                    raise LookupError('You need specify the project: %r'
801
                                      % list(self.projects.keys()))
802
            project_data = self.processor.project_manager.get(project)
803
            if not project_data:
804
                raise LookupError('no such project: %s' % project)
805
806
            # get task package
807
            instance = project_data['instance']
808
            instance._reset()
809
            task = instance.crawl(url, **kwargs)
810
            if isinstance(task, list):
811
                raise Exception('url list is not allowed in interactive mode')
812
813
            # check task in taskdb
814
            if not kwargs:
815
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
816
                                              fields=self.request_task_fields)
817
                if not dbtask:
818
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
819
                                                  fields=self.request_task_fields)
820
                if dbtask:
821
                    task = dbtask
822
823
            # select the task
824
            self.on_select_task(task)
825
            is_crawled.append(True)
826
827
            shell.ask_exit()
828
829
        def quit_interactive():
830
            '''Quit interactive mode'''
831
            is_crawled.append(True)
832
            self.interactive = False
833
            shell.ask_exit()
834
835
        def quit_pyspider():
836
            '''Close pyspider'''
837
            is_crawled[:] = []
838
            shell.ask_exit()
839
840
        shell = utils.get_python_console()
841
        shell.interact(
842
            'pyspider shell - Select task\n'
843
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
844
            'quit_interactive() - Quit interactive mode\n'
845
            'quit_pyspider() - Close pyspider'
846
        )
847
        if not is_crawled:
848
            self.ioloop.stop()
849
850
    def __getattr__(self, name):
851
        """patch for crawl(url, callback=self.index_page) API"""
852
        if self.interactive:
853
            return name
854
        raise AttributeError(name)
855
856
    def on_task_status(self, task):
857
        """Ignore not processing error in interactive mode"""
858
        if not self.interactive:
859
            super(OneScheduler, self).on_task_status(task)
860
861
        try:
862
            procesok = task['track']['process']['ok']
863
        except KeyError as e:
864
            logger.error("Bad status pack: %s", e)
865
            return None
866
867
        if procesok:
868
            ret = self.on_task_done(task)
869
        else:
870
            ret = self.on_task_failed(task)
871
        if task['track']['fetch'].get('time'):
872
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
873
                                       task['track']['fetch']['time'])
874
        if task['track']['process'].get('time'):
875
            self._cnt['5m_time'].event((task['project'], 'process_time'),
876
                                       task['track']['process'].get('time'))
877
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
878
        return ret
879
880
    def init_one(self, ioloop, fetcher, processor,
881
                 result_worker=None, interactive=False):
882
        self.ioloop = ioloop
883
        self.fetcher = fetcher
884
        self.processor = processor
885
        self.result_worker = result_worker
886
        self.interactive = interactive
887
        self.running_task = 0
888
889
    @gen.coroutine
890
    def do_task(self, task):
891
        self.running_task += 1
892
        result = yield gen.Task(self.fetcher.fetch, task)
893
        type, task, response = result.args
894
        self.processor.on_task(task, response)
895
        # do with message
896
        while not self.processor.inqueue.empty():
897
            _task, _response = self.processor.inqueue.get()
898
            self.processor.on_task(_task, _response)
899
        # do with results
900
        while not self.processor.result_queue.empty():
901
            _task, _result = self.processor.result_queue.get()
902
            if self.result_worker:
903
                self.result_worker.on_result(_task, _result)
904
        self.running_task -= 1
905
906
    def send_task(self, task, force=True):
907
        if self.fetcher.http_client.free_size() <= 0:
908
            if force:
909
                self._send_buffer.appendleft(task)
910
            else:
911
                raise self.outqueue.Full
912
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
913
914
    def run(self):
915
        import tornado.ioloop
916
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
917
                                        io_loop=self.ioloop).start()
918
        self.ioloop.start()
919
920
    def quit(self):
921
        self.ioloop.stop()
922
        logger.info("scheduler exiting...")
923
924
925
import random
926
import threading
927
928
929
class ThreadBaseScheduler(Scheduler):
930
    def __init__(self, threads=4, *args, **kwargs):
931
        self.threads = threads
932
        self.local = threading.local()
933
934
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
935
936
        self._taskdb = self.taskdb
937
        self._projectdb = self.projectdb
938
        self._resultdb = self.resultdb
939
940
        self.thread_objs = []
941
        self.thread_queues = []
942
        self._start_threads()
943
        assert len(self.thread_queues) > 0
944
945
    @property
946
    def taskdb(self):
947
        if not hasattr(self.local, 'taskdb'):
948
            self.taskdb = self._taskdb.copy()
949
        return self.local.taskdb
950
951
    @taskdb.setter
952
    def taskdb(self, taskdb):
953
        self.local.taskdb = taskdb
954
955
    @property
956
    def projectdb(self):
957
        if not hasattr(self.local, 'projectdb'):
958
            self.projectdb = self._projectdb.copy()
959
        return self.local.projectdb
960
961
    @projectdb.setter
962
    def projectdb(self, projectdb):
963
        self.local.projectdb = projectdb
964
965
    @property
966
    def resultdb(self):
967
        if not hasattr(self.local, 'resultdb'):
968
            self.resultdb = self._resultdb.copy()
969
        return self.local.resultdb
970
971
    @resultdb.setter
972
    def resultdb(self, resultdb):
973
        self.local.resultdb = resultdb
974
975
    def _start_threads(self):
976
        for i in range(self.threads):
977
            queue = Queue.Queue()
978
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
979
            thread.daemon = True
980
            thread.start()
981
            self.thread_objs.append(thread)
982
            self.thread_queues.append(queue)
983
984
    def _thread_worker(self, queue):
985
        while True:
986
            method, args, kwargs = queue.get()
987
            try:
988
                method(*args, **kwargs)
989
            except Exception as e:
990
                logger.exception(e)
991
992
    def _run_in_thread(self, method, *args, **kwargs):
993
        i = kwargs.pop('_i', None)
994
        block = kwargs.pop('_block', False)
995
996
        if i is None:
997
            while True:
998
                for queue in self.thread_queues:
999
                    if queue.empty():
1000
                        break
1001
                else:
1002
                    if block:
1003
                        time.sleep(0.1)
1004
                        continue
1005
                    else:
1006
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1007
                break
1008
        else:
1009
            queue = self.thread_queues[i % len(self.thread_queues)]
1010
1011
        queue.put((method, args, kwargs))
1012
1013
        if block:
1014
            self._wait_thread()
1015
1016
    def _wait_thread(self):
1017
        while True:
1018
            if all(queue.empty() for queue in self.thread_queues):
1019
                break
1020
            time.sleep(0.1)
1021
1022
    def _update_project(self, project):
1023
        self._run_in_thread(Scheduler._update_project, self, project)
1024
1025
    def on_task_status(self, task):
1026
        i = hash(task['taskid'])
1027
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1028
1029
    def on_request(self, task):
1030
        i = hash(task['taskid'])
1031
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1032
1033
    def _load_put_task(self, project, taskid):
1034
        i = hash(taskid)
1035
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1036
1037
    def run_once(self):
1038
        super(ThreadBaseScheduler, self).run_once()
1039
        self._wait_thread()
1040