Completed
Push — ghost.py ( a374b0 )
by Roy
04:41 queued 02:49
created

pyspider.scheduler.Scheduler.insert_task()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import os
10
import json
11
import time
12
import logging
13
import itertools
14
from collections import deque
15
16
from six import iteritems, itervalues
17
18
from pyspider.libs import counter, utils
19
from six.moves import queue as Queue
20
from .task_queue import TaskQueue
21
logger = logging.getLogger('scheduler')
22
23
24
class Scheduler(object):
25
    UPDATE_PROJECT_INTERVAL = 5 * 60
26
    default_schedule = {
27
        'priority': 0,
28
        'retries': 3,
29
        'exetime': 0,
30
        'age': -1,
31
        'itag': None,
32
    }
33
    LOOP_LIMIT = 1000
34
    LOOP_INTERVAL = 0.1
35
    ACTIVE_TASKS = 100
36
    INQUEUE_LIMIT = 0
37
    EXCEPTION_LIMIT = 3
38
    DELETE_TIME = 24 * 60 * 60
39
    DEFAULT_RETRY_DELAY = {
40
        0: 30,
41
        1: 1*60*60,
42
        2: 6*60*60,
43
        3: 12*60*60,
44
        '': 24*60*60
45
    }
46
47
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
48
                 out_queue, data_path='./data', resultdb=None):
49
        self.taskdb = taskdb
50
        self.projectdb = projectdb
51
        self.resultdb = resultdb
52
        self.newtask_queue = newtask_queue
53
        self.status_queue = status_queue
54
        self.out_queue = out_queue
55
        self.data_path = data_path
56
57
        self._send_buffer = deque()
58
        self._quit = False
59
        self._exceptions = 0
60
        self.projects = dict()
61
        self._force_update_project = False
62
        self._last_update_project = 0
63
        self.task_queue = dict()
64
        self._last_tick = int(time.time())
65
66
        self._cnt = {
67
            "5m_time": counter.CounterManager(
68
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
69
            "5m": counter.CounterManager(
70
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
71
            "1h": counter.CounterManager(
72
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
73
            "1d": counter.CounterManager(
74
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
75
            "all": counter.CounterManager(
76
                lambda: counter.TotalCounter()),
77
        }
78
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
79
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
80
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
81
        self._last_dump_cnt = 0
82
83
    def _update_projects(self):
84
        '''Check project update'''
85
        now = time.time()
86
        if (
87
                not self._force_update_project
88
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
89
        ):
90
            return
91
        for project in self.projectdb.check_update(self._last_update_project):
92
            self._update_project(project)
93
            logger.debug("project: %s updated.", project['name'])
94
        self._force_update_project = False
95
        self._last_update_project = now
96
97
    def _update_project(self, project):
98
        '''update one project'''
99
        if project['name'] not in self.projects:
100
            self.projects[project['name']] = {}
101
        self.projects[project['name']].update(project)
102
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
103
        if not self.projects[project['name']].get('active_tasks', None):
104
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
105
106
        # load task queue when project is running and delete task_queue when project is stoped
107
        if project['status'] in ('RUNNING', 'DEBUG'):
108
            if project['name'] not in self.task_queue:
109
                self._load_tasks(project['name'])
110
            self.task_queue[project['name']].rate = project['rate']
111
            self.task_queue[project['name']].burst = project['burst']
112
113
            # update project runtime info from processor by sending a _on_get_info
114
            # request, result is in status_page.track.save
115
            self.on_select_task({
116
                'taskid': '_on_get_info',
117
                'project': project['name'],
118
                'url': 'data:,_on_get_info',
119
                'status': self.taskdb.SUCCESS,
120
                'fetch': {
121
                    'save': ['min_tick', 'retry_delay'],
122
                },
123
                'process': {
124
                    'callback': '_on_get_info',
125
                },
126
            })
127
        else:
128
            if project['name'] in self.task_queue:
129
                self.task_queue[project['name']].rate = 0
130
                self.task_queue[project['name']].burst = 0
131
                del self.task_queue[project['name']]
132
133
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
134
135
    def _load_tasks(self, project):
136
        '''load tasks from database'''
137
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
138
        for task in self.taskdb.load_tasks(
139
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
140
        ):
141
            taskid = task['taskid']
142
            _schedule = task.get('schedule', self.default_schedule)
143
            priority = _schedule.get('priority', self.default_schedule['priority'])
144
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
145
            self.task_queue[project].put(taskid, priority, exetime)
146
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
147
148
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
149
            self.task_queue[project].rate = self.projects[project]['rate']
150
            self.task_queue[project].burst = self.projects[project]['burst']
151
        else:
152
            self.task_queue[project].rate = 0
153
            self.task_queue[project].burst = 0
154
155
        if project not in self._cnt['all']:
156
            status_count = self.taskdb.status_count(project)
157
            self._cnt['all'].value(
158
                (project, 'success'),
159
                status_count.get(self.taskdb.SUCCESS, 0)
160
            )
161
            self._cnt['all'].value(
162
                (project, 'failed'),
163
                status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
164
            )
165
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
166
167
    def task_verify(self, task):
168
        '''
169
        return False if any of 'taskid', 'project', 'url' is not in task dict
170
                        or project in not in task_queue
171
        '''
172
        for each in ('taskid', 'project', 'url', ):
173
            if each not in task or not task[each]:
174
                logger.error('%s not in task: %.200r', each, task)
175
                return False
176
        if task['project'] not in self.task_queue:
177
            logger.error('unknown project: %s', task['project'])
178
            return False
179
        return True
180
181
    def insert_task(self, task):
182
        '''insert task into database'''
183
        return self.taskdb.insert(task['project'], task['taskid'], task)
184
185
    def update_task(self, task):
186
        '''update task in database'''
187
        return self.taskdb.update(task['project'], task['taskid'], task)
188
189
    def put_task(self, task):
190
        '''put task to task queue'''
191
        _schedule = task.get('schedule', self.default_schedule)
192
        self.task_queue[task['project']].put(
193
            task['taskid'],
194
            priority=_schedule.get('priority', self.default_schedule['priority']),
195
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
196
        )
197
198
    def send_task(self, task, force=True):
199
        '''
200
        dispatch task to fetcher
201
202
        out queue may have size limit to prevent block, a send_buffer is used
203
        '''
204
        try:
205
            self.out_queue.put_nowait(task)
206
        except Queue.Full:
207
            if force:
208
                self._send_buffer.appendleft(task)
209
            else:
210
                raise
211
212
    def _check_task_done(self):
213
        '''Check status queue'''
214
        cnt = 0
215
        try:
216
            while True:
217
                task = self.status_queue.get_nowait()
218
                # check _on_get_info result here
219
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
220
                    self.projects[task['project']].update(task['track'].get('save') or {})
221
                    logger.info(
222
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
223
                    )
224
                    continue
225
                elif not self.task_verify(task):
226
                    continue
227
                self.on_task_status(task)
228
                cnt += 1
229
        except Queue.Empty:
230
            pass
231
        return cnt
232
233
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
234
235
    def _check_request(self):
236
        '''Check new task queue'''
237
        tasks = {}
238
        while len(tasks) < self.LOOP_LIMIT:
239
            try:
240
                task = self.newtask_queue.get_nowait()
241
            except Queue.Empty:
242
                break
243
244
            if isinstance(task, list):
245
                _tasks = task
246
            else:
247
                _tasks = (task, )
248
249
            for task in _tasks:
250
                if not self.task_verify(task):
251
                    continue
252
253
                if task['taskid'] in self.task_queue[task['project']]:
254
                    if not task.get('schedule', {}).get('force_update', False):
255
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
256
                        continue
257
258
                if task['taskid'] in tasks:
259
                    if not task.get('schedule', {}).get('force_update', False):
260
                        continue
261
262
                tasks[task['taskid']] = task
263
264
        for task in itervalues(tasks):
265
            if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
266
                logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
267
                continue
268
269
            oldtask = self.taskdb.get_task(task['project'], task['taskid'],
270
                                           fields=self.merge_task_fields)
271
            if oldtask:
272
                task = self.on_old_request(task, oldtask)
273
            else:
274
                task = self.on_new_request(task)
275
276
        return len(tasks)
277
278
    def _check_cronjob(self):
279
        """Check projects cronjob tick, return True when a new tick is sended"""
280
        now = time.time()
281
        self._last_tick = int(self._last_tick)
282
        if now - self._last_tick < 1:
283
            return False
284
        self._last_tick += 1
285
        for project in itervalues(self.projects):
286
            if project['status'] not in ('DEBUG', 'RUNNING'):
287
                continue
288
            if project.get('min_tick', 0) == 0:
289
                continue
290
            if self._last_tick % int(project['min_tick']) != 0:
291
                continue
292
            self.on_select_task({
293
                'taskid': '_on_cronjob',
294
                'project': project['name'],
295
                'url': 'data:,_on_cronjob',
296
                'status': self.taskdb.SUCCESS,
297
                'fetch': {
298
                    'save': {
299
                        'tick': self._last_tick,
300
                    },
301
                },
302
                'process': {
303
                    'callback': '_on_cronjob',
304
                },
305
            })
306
        return True
307
308
    request_task_fields = [
309
        'taskid',
310
        'project',
311
        'url',
312
        'status',
313
        'schedule',
314
        'fetch',
315
        'process',
316
        'track',
317
        'lastcrawltime'
318
    ]
319
320
    def _check_select(self):
321
        '''Select task to fetch & process'''
322
        while self._send_buffer:
323
            _task = self._send_buffer.pop()
324
            try:
325
                # use force=False here to prevent automatic send_buffer append and get exception
326
                self.send_task(_task, False)
327
            except Queue.Full:
328
                self._send_buffer.append(_task)
329
                break
330
331
        if self.out_queue.full():
332
            return {}
333
334
        taskids = []
335
        cnt = 0
336
        cnt_dict = dict()
337
        limit = self.LOOP_LIMIT
338
        for project, task_queue in iteritems(self.task_queue):
339
            if cnt >= limit:
340
                break
341
342
            # task queue
343
            self.task_queue[project].check_update()
344
            project_cnt = 0
345
346
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
347
            while cnt < limit and project_cnt < limit / 10:
348
                taskid = task_queue.get()
349
                if not taskid:
350
                    break
351
352
                taskids.append((project, taskid))
353
                project_cnt += 1
354
                cnt += 1
355
            cnt_dict[project] = project_cnt
356
357
        for project, taskid in taskids:
358
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
359
            if not task:
360
                continue
361
            task = self.on_select_task(task)
362
363
        return cnt_dict
364
365
    def _print_counter_log(self):
366
        # print top 5 active counters
367
        keywords = ('pending', 'success', 'retry', 'failed')
368
        total_cnt = {}
369
        project_actives = []
370
        project_fails = []
371
        for key in keywords:
372
            total_cnt[key] = 0
373
        for project, subcounter in iteritems(self._cnt['5m']):
374
            actives = 0
375
            for key in keywords:
376
                cnt = subcounter.get(key, None)
377
                if cnt:
378
                    cnt = cnt.sum
379
                    total_cnt[key] += cnt
380
                    actives += cnt
381
382
            project_actives.append((actives, project))
383
384
            fails = subcounter.get('failed', None)
385
            if fails:
386
                project_fails.append((fails.sum, project))
387
388
        top_2_fails = sorted(project_fails, reverse=True)[:2]
389
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
390
                               reverse=True)[:5 - len(top_2_fails)]
391
392
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
393
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
394
        for _, project in itertools.chain(top_3_actives, top_2_fails):
395
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
396
            log_str += " %s:%d,%d,%d,%d" % (project,
397
                                            subcounter.get('pending', 0),
398
                                            subcounter.get('success', 0),
399
                                            subcounter.get('retry', 0),
400
                                            subcounter.get('failed', 0))
401
        logger.info(log_str)
402
403
    def _dump_cnt(self):
404
        '''Dump counters to file'''
405
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
406
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
407
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
408
409
    def _try_dump_cnt(self):
410
        '''Dump counters every 60 seconds'''
411
        now = time.time()
412
        if now - self._last_dump_cnt > 60:
413
            self._last_dump_cnt = now
414
            self._dump_cnt()
415
            self._print_counter_log()
416
417
    def _check_delete(self):
418
        '''Check project delete'''
419
        now = time.time()
420
        for project in list(itervalues(self.projects)):
421
            if project['status'] != 'STOP':
422
                continue
423
            if now - project['updatetime'] < self.DELETE_TIME:
424
                continue
425
            if 'delete' not in self.projectdb.split_group(project['group']):
426
                continue
427
428
            logger.warning("deleting project: %s!", project['name'])
429
            if project['name'] in self.task_queue:
430
                self.task_queue[project['name']].rate = 0
431
                self.task_queue[project['name']].burst = 0
432
                del self.task_queue[project['name']]
433
            del self.projects[project['name']]
434
            self.taskdb.drop(project['name'])
435
            self.projectdb.drop(project['name'])
436
            if self.resultdb:
437
                self.resultdb.drop(project['name'])
438
439
    def __len__(self):
440
        return sum(len(x) for x in itervalues(self.task_queue))
441
442
    def quit(self):
443
        '''Set quit signal'''
444
        self._quit = True
445
446
    def run_once(self):
447
        '''comsume queues and feed tasks to fetcher, once'''
448
449
        self._update_projects()
450
        self._check_task_done()
451
        self._check_request()
452
        while self._check_cronjob():
453
            pass
454
        self._check_select()
455
        self._check_delete()
456
        self._try_dump_cnt()
457
458
    def run(self):
459
        '''Start scheduler loop'''
460
        logger.info("loading projects")
461
462
        while not self._quit:
463
            try:
464
                time.sleep(self.LOOP_INTERVAL)
465
                self.run_once()
466
                self._exceptions = 0
467
            except KeyboardInterrupt:
468
                break
469
            except Exception as e:
470
                logger.exception(e)
471
                self._exceptions += 1
472
                if self._exceptions > self.EXCEPTION_LIMIT:
473
                    break
474
                continue
475
476
        logger.info("scheduler exiting...")
477
        self._dump_cnt()
478
479
    def trigger_on_start(self, project):
480
        '''trigger an on_start callback of project'''
481
        self.newtask_queue.put({
482
            "project": project,
483
            "taskid": "on_start",
484
            "url": "data:,on_start",
485
            "process": {
486
                "callback": "on_start",
487
            },
488
        })
489
490
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
491
        '''Start xmlrpc interface'''
492
        try:
493
            from six.moves.xmlrpc_server import SimpleXMLRPCServer
494
        except ImportError:
495
            from SimpleXMLRPCServer import SimpleXMLRPCServer
496
497
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
498
        server.register_introspection_functions()
499
        server.register_multicall_functions()
500
501
        server.register_function(self.quit, '_quit')
502
        server.register_function(self.__len__, 'size')
503
504
        def dump_counter(_time, _type):
505
            try:
506
                return self._cnt[_time].to_dict(_type)
507
            except:
508
                logger.exception('')
509
        server.register_function(dump_counter, 'counter')
510
511
        def new_task(task):
512
            if self.task_verify(task):
513
                self.newtask_queue.put(task)
514
                return True
515
            return False
516
        server.register_function(new_task, 'newtask')
517
518
        def send_task(task):
519
            '''dispatch task to fetcher'''
520
            self.send_task(task)
521
            return True
522
        server.register_function(send_task, 'send_task')
523
524
        def update_project():
525
            self._force_update_project = True
526
        server.register_function(update_project, 'update_project')
527
528
        def get_active_tasks(project=None, limit=100):
529
            allowed_keys = set((
530
                'taskid',
531
                'project',
532
                'status',
533
                'url',
534
                'lastcrawltime',
535
                'updatetime',
536
                'track',
537
            ))
538
            track_allowed_keys = set((
539
                'ok',
540
                'time',
541
                'follows',
542
                'status_code',
543
            ))
544
545
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
546
                     if x and (k == project if project else True)]
547
            tasks = [next(x, None) for x in iters]
548
            result = []
549
550
            while len(result) < limit and tasks and not all(x is None for x in tasks):
551
                updatetime, task = t = max(t for t in tasks if t)
552
                i = tasks.index(t)
553
                tasks[i] = next(iters[i], None)
554
                for key in list(task):
555
                    if key == 'track':
556
                        for k in list(task[key].get('fetch', [])):
557
                            if k not in track_allowed_keys:
558
                                del task[key]['fetch'][k]
559
                        for k in list(task[key].get('process', [])):
560
                            if k not in track_allowed_keys:
561
                                del task[key]['process'][k]
562
                    if key in allowed_keys:
563
                        continue
564
                    del task[key]
565
                result.append(t)
566
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
567
            # have no idea why
568
            return json.loads(json.dumps(result))
569
        server.register_function(get_active_tasks, 'get_active_tasks')
570
571
        server.timeout = 0.5
572
        while not self._quit:
573
            server.handle_request()
574
        server.server_close()
575
576
    def on_new_request(self, task):
577
        '''Called when a new request is arrived'''
578
        task['status'] = self.taskdb.ACTIVE
579
        self.insert_task(task)
580
        self.put_task(task)
581
582
        project = task['project']
583
        self._cnt['5m'].event((project, 'pending'), +1)
584
        self._cnt['1h'].event((project, 'pending'), +1)
585
        self._cnt['1d'].event((project, 'pending'), +1)
586
        self._cnt['all'].event((project, 'pending'), +1)
587
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
588
        return task
589
590
    def on_old_request(self, task, old_task):
591
        '''Called when a crawled task is arrived'''
592
        now = time.time()
593
594
        _schedule = task.get('schedule', self.default_schedule)
595
        old_schedule = old_task.get('schedule', {})
596
597
        restart = False
598
        schedule_age = _schedule.get('age', self.default_schedule['age'])
599
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
600
            restart = True
601
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
602
            restart = True
603
        elif _schedule.get('force_update'):
604
            restart = True
605
606
        if not restart:
607
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
608
            return
609
610
        task['status'] = self.taskdb.ACTIVE
611
        self.update_task(task)
612
        self.put_task(task)
613
614
        project = task['project']
615
        if old_task['status'] != self.taskdb.ACTIVE:
616
            self._cnt['5m'].event((project, 'pending'), +1)
617
            self._cnt['1h'].event((project, 'pending'), +1)
618
            self._cnt['1d'].event((project, 'pending'), +1)
619
        if old_task['status'] == self.taskdb.SUCCESS:
620
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
621
        elif old_task['status'] == self.taskdb.FAILED:
622
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
623
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
624
        return task
625
626
    def on_task_status(self, task):
627
        '''Called when a status pack is arrived'''
628
        try:
629
            procesok = task['track']['process']['ok']
630
            if not self.task_queue[task['project']].done(task['taskid']):
631
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
632
                return None
633
        except KeyError as e:
634
            logger.error("Bad status pack: %s", e)
635
            return None
636
637
        if procesok:
638
            ret = self.on_task_done(task)
639
        else:
640
            ret = self.on_task_failed(task)
641
642
        if task['track']['fetch'].get('time'):
643
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
644
                                       task['track']['fetch']['time'])
645
        if task['track']['process'].get('time'):
646
            self._cnt['5m_time'].event((task['project'], 'process_time'),
647
                                       task['track']['process'].get('time'))
648
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
649
        return ret
650
651
    def on_task_done(self, task):
652
        '''Called when a task is done and success, called by `on_task_status`'''
653
        task['status'] = self.taskdb.SUCCESS
654
        task['lastcrawltime'] = time.time()
655
656
        if 'schedule' in task:
657
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
658
                task['status'] = self.taskdb.ACTIVE
659
                next_exetime = task['schedule'].get('age')
660
                task['schedule']['exetime'] = time.time() + next_exetime
661
                self.put_task(task)
662
            else:
663
                del task['schedule']
664
        self.update_task(task)
665
666
        project = task['project']
667
        self._cnt['5m'].event((project, 'success'), +1)
668
        self._cnt['1h'].event((project, 'success'), +1)
669
        self._cnt['1d'].event((project, 'success'), +1)
670
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
671
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
672
        return task
673
674
    def on_task_failed(self, task):
675
        '''Called when a task is failed, called by `on_task_status`'''
676
677
        if 'schedule' not in task:
678
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
679
            if old_task is None:
680
                logging.error('unknown status pack: %s' % task)
681
                return
682
            task['schedule'] = old_task.get('schedule', {})
683
684
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
685
        retried = task['schedule'].get('retried', 0)
686
687
        project_info = self.projects.get(task['project'], {})
688
        retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY
689
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
690
691
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
692
            next_exetime = min(next_exetime, task['schedule'].get('age'))
693
        else:
694
            if retried >= retries:
695
                next_exetime = -1
696
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
697
                next_exetime = task['schedule'].get('age')
698
699
        if next_exetime < 0:
700
            task['status'] = self.taskdb.FAILED
701
            task['lastcrawltime'] = time.time()
702
            self.update_task(task)
703
704
            project = task['project']
705
            self._cnt['5m'].event((project, 'failed'), +1)
706
            self._cnt['1h'].event((project, 'failed'), +1)
707
            self._cnt['1d'].event((project, 'failed'), +1)
708
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
709
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
710
            return task
711
        else:
712
            task['schedule']['retried'] = retried + 1
713
            task['schedule']['exetime'] = time.time() + next_exetime
714
            task['lastcrawltime'] = time.time()
715
            self.update_task(task)
716
            self.put_task(task)
717
718
            project = task['project']
719
            self._cnt['5m'].event((project, 'retry'), +1)
720
            self._cnt['1h'].event((project, 'retry'), +1)
721
            self._cnt['1d'].event((project, 'retry'), +1)
722
            # self._cnt['all'].event((project, 'retry'), +1)
723
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
724
                retried, retries), task)
725
            return task
726
727
    def on_select_task(self, task):
728
        '''Called when a task is selected to fetch & process'''
729
        # inject informations about project
730
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
731
732
        project_info = self.projects.get(task['project'])
733
        assert project_info, 'no such project'
734
        task['group'] = project_info.get('group')
735
        task['project_md5sum'] = project_info.get('md5sum')
736
        task['project_updatetime'] = project_info.get('updatetime', 0)
737
        project_info['active_tasks'].appendleft((time.time(), task))
738
        self.send_task(task)
739
        return task
740
741
742
from tornado import gen
743
744
745
class OneScheduler(Scheduler):
746
    """
747
    Scheduler Mixin class for one mode
748
749
    overwirted send_task method
750
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
751
    """
752
753
    def _check_select(self):
754
        """
755
        interactive mode of select tasks
756
        """
757
        if not self.interactive:
758
            return super(OneScheduler, self)._check_select()
759
760
        # waiting for running tasks
761
        if self.running_task > 0:
762
            return
763
764
        is_crawled = []
765
766
        def run(project=None):
767
            return crawl('on_start', project=project)
768
769
        def crawl(url, project=None, **kwargs):
770
            """
771
            Crawl given url, same parameters as BaseHandler.crawl
772
773
            url - url or taskid, parameters will be used if in taskdb
774
            project - can be ignored if only one project exists.
775
            """
776
777
            # looking up the project instance
778
            if project is None:
779
                if len(self.projects) == 1:
780
                    project = list(self.projects.keys())[0]
781
                else:
782
                    raise LookupError('You need specify the project: %r'
783
                                      % list(self.projects.keys()))
784
            project_data = self.processor.project_manager.get(project)
785
            if not project_data:
786
                raise LookupError('no such project: %s' % project)
787
788
            # get task package
789
            instance = project_data['instance']
790
            instance._reset()
791
            task = instance.crawl(url, **kwargs)
792
            if isinstance(task, list):
793
                raise Exception('url list is not allowed in interactive mode')
794
795
            # check task in taskdb
796
            if not kwargs:
797
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
798
                                              fields=self.request_task_fields)
799
                if not dbtask:
800
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
801
                                                  fields=self.request_task_fields)
802
                if dbtask:
803
                    task = dbtask
804
805
            # select the task
806
            self.on_select_task(task)
807
            is_crawled.append(True)
808
809
            shell.ask_exit()
810
811
        def quit_interactive():
812
            '''Quit interactive mode'''
813
            is_crawled.append(True)
814
            self.interactive = False
815
            shell.ask_exit()
816
817
        def quit_pyspider():
818
            '''Close pyspider'''
819
            is_crawled[:] = []
820
            shell.ask_exit()
821
822
        shell = utils.get_python_console()
823
        shell.interact(
824
            'pyspider shell - Select task\n'
825
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
826
            'quit_interactive() - Quit interactive mode\n'
827
            'quit_pyspider() - Close pyspider'
828
        )
829
        if not is_crawled:
830
            self.ioloop.stop()
831
832
    def __getattr__(self, name):
833
        """patch for crawl(url, callback=self.index_page) API"""
834
        if self.interactive:
835
            return name
836
        raise AttributeError(name)
837
838
    def on_task_status(self, task):
839
        """Ignore not processing error in interactive mode"""
840
        if not self.interactive:
841
            super(OneScheduler, self).on_task_status(task)
842
843
        try:
844
            procesok = task['track']['process']['ok']
845
        except KeyError as e:
846
            logger.error("Bad status pack: %s", e)
847
            return None
848
849
        if procesok:
850
            ret = self.on_task_done(task)
851
        else:
852
            ret = self.on_task_failed(task)
853
        if task['track']['fetch'].get('time'):
854
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
855
                                       task['track']['fetch']['time'])
856
        if task['track']['process'].get('time'):
857
            self._cnt['5m_time'].event((task['project'], 'process_time'),
858
                                       task['track']['process'].get('time'))
859
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
860
        return ret
861
862
    def init_one(self, ioloop, fetcher, processor,
863
                 result_worker=None, interactive=False):
864
        self.ioloop = ioloop
865
        self.fetcher = fetcher
866
        self.processor = processor
867
        self.result_worker = result_worker
868
        self.interactive = interactive
869
        self.running_task = 0
870
871
    @gen.coroutine
872
    def do_task(self, task):
873
        self.running_task += 1
874
        result = yield gen.Task(self.fetcher.fetch, task)
875
        type, task, response = result.args
876
        self.processor.on_task(task, response)
877
        # do with message
878
        while not self.processor.inqueue.empty():
879
            _task, _response = self.processor.inqueue.get()
880
            self.processor.on_task(_task, _response)
881
        # do with results
882
        while not self.processor.result_queue.empty():
883
            _task, _result = self.processor.result_queue.get()
884
            if self.result_worker:
885
                self.result_worker.on_result(_task, _result)
886
        self.running_task -= 1
887
888
    def send_task(self, task, force=True):
889
        if self.fetcher.http_client.free_size() <= 0:
890
            if force:
891
                self._send_buffer.appendleft(task)
892
            else:
893
                raise self.outqueue.Full
894
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
895
896
    def run(self):
897
        import tornado.ioloop
898
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
899
                                        io_loop=self.ioloop).start()
900
        self.ioloop.start()
901
902
    def quit(self):
903
        self.ioloop.stop()
904
        logger.info("scheduler exiting...")
905