Completed
Push — master ( 01fcff...6a66d3 )
by Roy
01:07
created

pyspider.scheduler.Scheduler.__len__()   A

Complexity

Conditions 2

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 2
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import os
10
import json
11
import time
12
import logging
13
import itertools
14
from collections import deque
15
16
from six import iteritems, itervalues
17
18
from pyspider.libs import counter, utils
19
from pyspider.libs.queue import Queue
20
from .task_queue import TaskQueue
21
logger = logging.getLogger('scheduler')
22
23
24
class Scheduler(object):
25
    UPDATE_PROJECT_INTERVAL = 5 * 60
26
    default_schedule = {
27
        'priority': 0,
28
        'retries': 3,
29
        'exetime': 0,
30
        'age': -1,
31
        'itag': None,
32
    }
33
    LOOP_LIMIT = 1000
34
    LOOP_INTERVAL = 0.1
35
    ACTIVE_TASKS = 100
36
    INQUEUE_LIMIT = 0
37
    EXCEPTION_LIMIT = 3
38
    DELETE_TIME = 24 * 60 * 60
39
    DEFAULT_RETRY_DELAY = {
40
        0: 30,
41
        1: 1*60*60,
42
        2: 6*60*60,
43
        3: 12*60*60,
44
        '': 24*60*60
45
    }
46
47
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
48
                 out_queue, data_path='./data', resultdb=None):
49
        self.taskdb = taskdb
50
        self.projectdb = projectdb
51
        self.resultdb = resultdb
52
        self.newtask_queue = newtask_queue
53
        self.status_queue = status_queue
54
        self.out_queue = out_queue
55
        self.data_path = data_path
56
57
        self._send_buffer = deque()
58
        self._quit = False
59
        self._exceptions = 0
60
        self.projects = dict()
61
        self._force_update_project = False
62
        self._last_update_project = 0
63
        self.task_queue = dict()
64
        self._last_tick = int(time.time())
65
66
        self._cnt = {
67
            "5m_time": counter.CounterManager(
68
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
69
            "5m": counter.CounterManager(
70
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
71
            "1h": counter.CounterManager(
72
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
73
            "1d": counter.CounterManager(
74
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
75
            "all": counter.CounterManager(
76
                lambda: counter.TotalCounter()),
77
        }
78
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
79
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
80
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
81
        self._last_dump_cnt = 0
82
83
    def _update_projects(self):
84
        '''Check project update'''
85
        now = time.time()
86
        if (
87
                not self._force_update_project
88
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
89
        ):
90
            return
91
        for project in self.projectdb.check_update(self._last_update_project):
92
            self._update_project(project)
93
            logger.debug("project: %s updated.", project['name'])
94
        self._force_update_project = False
95
        self._last_update_project = now
96
97
    def _update_project(self, project):
98
        '''update one project'''
99
        if project['name'] not in self.projects:
100
            self.projects[project['name']] = {}
101
        self.projects[project['name']].update(project)
102
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
103
        if not self.projects[project['name']].get('active_tasks', None):
104
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
105
106
        # load task queue when project is running and delete task_queue when project is stoped
107
        if project['status'] in ('RUNNING', 'DEBUG'):
108
            if project['name'] not in self.task_queue:
109
                self._load_tasks(project['name'])
110
            self.task_queue[project['name']].rate = project['rate']
111
            self.task_queue[project['name']].burst = project['burst']
112
113
            # update project runtime info from processor by sending a _on_get_info
114
            # request, result is in status_page.track.save
115
            self.on_select_task({
116
                'taskid': '_on_get_info',
117
                'project': project['name'],
118
                'url': 'data:,_on_get_info',
119
                'status': self.taskdb.SUCCESS,
120
                'fetch': {
121
                    'save': ['min_tick', 'retry_delay'],
122
                },
123
                'process': {
124
                    'callback': '_on_get_info',
125
                },
126
            })
127
        else:
128
            if project['name'] in self.task_queue:
129
                self.task_queue[project['name']].rate = 0
130
                self.task_queue[project['name']].burst = 0
131
                del self.task_queue[project['name']]
132
133
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
134
135
    def _load_tasks(self, project):
136
        '''load tasks from database'''
137
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
138
        for task in self.taskdb.load_tasks(
139
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
140
        ):
141
            taskid = task['taskid']
142
            _schedule = task.get('schedule', self.default_schedule)
143
            priority = _schedule.get('priority', self.default_schedule['priority'])
144
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
145
            self.task_queue[project].put(taskid, priority, exetime)
146
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
147
148
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
149
            self.task_queue[project].rate = self.projects[project]['rate']
150
            self.task_queue[project].burst = self.projects[project]['burst']
151
        else:
152
            self.task_queue[project].rate = 0
153
            self.task_queue[project].burst = 0
154
155
        if project not in self._cnt['all']:
156
            status_count = self.taskdb.status_count(project)
157
            self._cnt['all'].value(
158
                (project, 'success'),
159
                status_count.get(self.taskdb.SUCCESS, 0)
160
            )
161
            self._cnt['all'].value(
162
                (project, 'failed'),
163
                status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
164
            )
165
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
166
167
    def task_verify(self, task):
168
        '''
169
        return False if any of 'taskid', 'project', 'url' is not in task dict
170
                        or project in not in task_queue
171
        '''
172
        for each in ('taskid', 'project', 'url', ):
173
            if each not in task or not task[each]:
174
                logger.error('%s not in task: %.200r', each, task)
175
                return False
176
        if task['project'] not in self.task_queue:
177
            logger.error('unknown project: %s', task['project'])
178
            return False
179
        return True
180
181
    def insert_task(self, task):
182
        '''insert task into database'''
183
        return self.taskdb.insert(task['project'], task['taskid'], task)
184
185
    def update_task(self, task):
186
        '''update task in database'''
187
        return self.taskdb.update(task['project'], task['taskid'], task)
188
189
    def put_task(self, task):
190
        '''put task to task queue'''
191
        _schedule = task.get('schedule', self.default_schedule)
192
        self.task_queue[task['project']].put(
193
            task['taskid'],
194
            priority=_schedule.get('priority', self.default_schedule['priority']),
195
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
196
        )
197
198
    def send_task(self, task, force=True):
199
        '''
200
        dispatch task to fetcher
201
202
        out queue may have size limit to prevent block, a send_buffer is used
203
        '''
204
        try:
205
            self.out_queue.put_nowait(task)
206
        except Queue.Full:
207
            if force:
208
                self._send_buffer.appendleft(task)
209
            else:
210
                raise
211
212
    def _check_task_done(self):
213
        '''Check status queue'''
214
        cnt = 0
215
        try:
216
            while True:
217
                task = self.status_queue.get_nowait()
218
                # check _on_get_info result here
219
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
220
                    self.projects[task['project']].update(task['track'].get('save') or {})
221
                    logger.info(
222
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
223
                    )
224
                    continue
225
                elif not self.task_verify(task):
226
                    continue
227
                self.on_task_status(task)
228
                cnt += 1
229
        except Queue.Empty:
230
            pass
231
        return cnt
232
233
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
234
235
    def _check_request(self):
236
        '''Check new task queue'''
237
        tasks = {}
238
        while len(tasks) < self.LOOP_LIMIT:
239
            try:
240
                task = self.newtask_queue.get_nowait()
241
            except Queue.Empty:
242
                break
243
244
            if isinstance(task, list):
245
                _tasks = task
246
            else:
247
                _tasks = (task, )
248
249
            for task in _tasks:
250
                if not self.task_verify(task):
251
                    continue
252
253
                if task['taskid'] in self.task_queue[task['project']]:
254
                    if not task.get('schedule', {}).get('force_update', False):
255
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
256
                        continue
257
258
                if task['taskid'] in tasks:
259
                    if not task.get('schedule', {}).get('force_update', False):
260
                        continue
261
262
                tasks[task['taskid']] = task
263
264
        for task in itervalues(tasks):
265
            if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
266
                logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
267
                continue
268
269
            oldtask = self.taskdb.get_task(task['project'], task['taskid'],
270
                                           fields=self.merge_task_fields)
271
            if oldtask:
272
                task = self.on_old_request(task, oldtask)
273
            else:
274
                task = self.on_new_request(task)
275
276
        return len(tasks)
277
278
    def _check_cronjob(self):
279
        """Check projects cronjob tick, return True when a new tick is sended"""
280
        now = time.time()
281
        self._last_tick = int(self._last_tick)
282
        if now - self._last_tick < 1:
283
            return False
284
        self._last_tick += 1
285
        for project in itervalues(self.projects):
286
            if project['status'] not in ('DEBUG', 'RUNNING'):
287
                continue
288
            if project.get('min_tick', 0) == 0:
289
                continue
290
            if self._last_tick % int(project['min_tick']) != 0:
291
                continue
292
            self.on_select_task({
293
                'taskid': '_on_cronjob',
294
                'project': project['name'],
295
                'url': 'data:,_on_cronjob',
296
                'status': self.taskdb.SUCCESS,
297
                'fetch': {
298
                    'save': {
299
                        'tick': self._last_tick,
300
                    },
301
                },
302
                'process': {
303
                    'callback': '_on_cronjob',
304
                },
305
            })
306
        return True
307
308
    request_task_fields = [
309
        'taskid',
310
        'project',
311
        'url',
312
        'status',
313
        'schedule',
314
        'fetch',
315
        'process',
316
        'track',
317
        'lastcrawltime'
318
    ]
319
320
    def _check_select(self):
321
        '''Select task to fetch & process'''
322
        while self._send_buffer:
323
            _task = self._send_buffer.pop()
324
            try:
325
                # use force=False here to prevent automatic send_buffer append and get exception
326
                self.send_task(_task, False)
327
            except Queue.Full:
328
                self._send_buffer.append(_task)
329
                break
330
331
        if self.out_queue.full():
332
            return {}
333
334
        taskids = []
335
        cnt = 0
336
        cnt_dict = dict()
337
        limit = self.LOOP_LIMIT
338
        for project, task_queue in iteritems(self.task_queue):
339
            if cnt >= limit:
340
                break
341
342
            # task queue
343
            self.task_queue[project].check_update()
344
            project_cnt = 0
345
346
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
347
            while cnt < limit and project_cnt < limit / 10:
348
                taskid = task_queue.get()
349
                if not taskid:
350
                    break
351
352
                taskids.append((project, taskid))
353
                project_cnt += 1
354
                cnt += 1
355
            cnt_dict[project] = project_cnt
356
357
        for project, taskid in taskids:
358
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
359
            if not task:
360
                continue
361
            task = self.on_select_task(task)
362
363
        return cnt_dict
364
365
    def _print_counter_log(self):
366
        # print top 5 active counters
367
        keywords = ('pending', 'success', 'retry', 'failed')
368
        total_cnt = {}
369
        project_actives = []
370
        project_fails = []
371
        for key in keywords:
372
            total_cnt[key] = 0
373
        for project, subcounter in iteritems(self._cnt['5m']):
374
            actives = 0
375
            for key in keywords:
376
                cnt = subcounter.get(key, None)
377
                if cnt:
378
                    cnt = cnt.sum
379
                    total_cnt[key] += cnt
380
                    actives += cnt
381
382
            project_actives.append((actives, project))
383
384
            fails = subcounter.get('failed', None)
385
            if fails:
386
                project_fails.append((fails.sum, project))
387
388
        top_2_fails = sorted(project_fails, reverse=True)[:2]
389
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
390
                               reverse=True)[:5 - len(top_2_fails)]
391
392
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
393
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
394
        for _, project in itertools.chain(top_3_actives, top_2_fails):
395
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
396
            log_str += " %s:%d,%d,%d,%d" % (project,
397
                                            subcounter.get('pending', 0),
398
                                            subcounter.get('success', 0),
399
                                            subcounter.get('retry', 0),
400
                                            subcounter.get('failed', 0))
401
        logger.info(log_str)
402
403
    def _dump_cnt(self):
404
        '''Dump counters to file'''
405
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
406
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
407
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
408
409
    def _try_dump_cnt(self):
410
        '''Dump counters every 60 seconds'''
411
        now = time.time()
412
        if now - self._last_dump_cnt > 60:
413
            self._last_dump_cnt = now
414
            self._dump_cnt()
415
            self._print_counter_log()
416
417
    def _check_delete(self):
418
        '''Check project delete'''
419
        now = time.time()
420
        for project in list(itervalues(self.projects)):
421
            if project['status'] != 'STOP':
422
                continue
423
            if now - project['updatetime'] < self.DELETE_TIME:
424
                continue
425
            if 'delete' not in self.projectdb.split_group(project['group']):
426
                continue
427
428
            logger.warning("deleting project: %s!", project['name'])
429
            if project['name'] in self.task_queue:
430
                self.task_queue[project['name']].rate = 0
431
                self.task_queue[project['name']].burst = 0
432
                del self.task_queue[project['name']]
433
            del self.projects[project['name']]
434
            self.taskdb.drop(project['name'])
435
            self.projectdb.drop(project['name'])
436
            if self.resultdb:
437
                self.resultdb.drop(project['name'])
438
439
    def __len__(self):
440
        return sum(len(x) for x in itervalues(self.task_queue))
441
442
    def quit(self):
443
        '''Set quit signal'''
444
        self._quit = True
445
446
    def run_once(self):
447
        '''comsume queues and feed tasks to fetcher, once'''
448
449
        self._update_projects()
450
        self._check_task_done()
451
        self._check_request()
452
        while self._check_cronjob():
453
            pass
454
        self._check_select()
455
        self._check_delete()
456
        self._try_dump_cnt()
457
458
    def run(self):
459
        '''Start scheduler loop'''
460
        logger.info("loading projects")
461
462
        while not self._quit:
463
            try:
464
                time.sleep(self.LOOP_INTERVAL)
465
                self.run_once()
466
                self._exceptions = 0
467
            except KeyboardInterrupt:
468
                break
469
            except Exception as e:
470
                logger.exception(e)
471
                self._exceptions += 1
472
                if self._exceptions > self.EXCEPTION_LIMIT:
473
                    break
474
                continue
475
476
        logger.info("scheduler exiting...")
477
        self._dump_cnt()
478
479
    def trigger_on_start(self, project):
480
        '''trigger an on_start callback of project'''
481
        self.newtask_queue.put({
482
            "project": project,
483
            "taskid": "on_start",
484
            "url": "data:,on_start",
485
            "process": {
486
                "callback": "on_start",
487
            },
488
        })
489
490
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
491
        '''Start xmlrpc interface'''
492
        try:
493
            from six.moves.xmlrpc_server import SimpleXMLRPCServer
494
        except ImportError:
495
            from SimpleXMLRPCServer import SimpleXMLRPCServer
496
497
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
498
        server.register_introspection_functions()
499
        server.register_multicall_functions()
500
501
        server.register_function(self.quit, '_quit')
502
        server.register_function(self.__len__, 'size')
503
504
        def dump_counter(_time, _type):
505
            try:
506
                return self._cnt[_time].to_dict(_type)
507
            except:
508
                logger.exception('')
509
        server.register_function(dump_counter, 'counter')
510
511
        def new_task(task):
512
            if self.task_verify(task):
513
                self.newtask_queue.put(task)
514
                return True
515
            return False
516
        server.register_function(new_task, 'newtask')
517
518
        def send_task(task):
519
            '''dispatch task to fetcher'''
520
            self.send_task(task)
521
            return True
522
        server.register_function(send_task, 'send_task')
523
524
        def update_project():
525
            self._force_update_project = True
526
        server.register_function(update_project, 'update_project')
527
528
        def get_active_tasks(project=None, limit=100):
529
            allowed_keys = set((
530
                'taskid',
531
                'project',
532
                'status',
533
                'url',
534
                'lastcrawltime',
535
                'updatetime',
536
                'track',
537
            ))
538
            track_allowed_keys = set((
539
                'ok',
540
                'time',
541
                'follows',
542
                'status_code',
543
            ))
544
545
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
546
                     if x and (k == project if project else True)]
547
            tasks = [next(x, None) for x in iters]
548
            result = []
549
550
            while len(result) < limit and tasks and not all(x is None for x in tasks):
551
                updatetime, task = t = max(t for t in tasks if t)
552
                i = tasks.index(t)
553
                tasks[i] = next(iters[i], None)
554
                for key in list(task):
555
                    if key == 'track':
556
                        for k in list(task[key].get('fetch', [])):
557
                            if k not in track_allowed_keys:
558
                                del task[key]['fetch'][k]
559
                        for k in list(task[key].get('process', [])):
560
                            if k not in track_allowed_keys:
561
                                del task[key]['process'][k]
562
                    if key in allowed_keys:
563
                        continue
564
                    del task[key]
565
                result.append(t)
566
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
567
            # have no idea why
568
            return json.loads(json.dumps(result))
569
        server.register_function(get_active_tasks, 'get_active_tasks')
570
571
        server.timeout = 0.5
572
        while not self._quit:
573
            server.handle_request()
574
        server.server_close()
575
576
    def on_new_request(self, task):
577
        '''Called when a new request is arrived'''
578
        task['status'] = self.taskdb.ACTIVE
579
        self.insert_task(task)
580
        self.put_task(task)
581
582
        project = task['project']
583
        self._cnt['5m'].event((project, 'pending'), +1)
584
        self._cnt['1h'].event((project, 'pending'), +1)
585
        self._cnt['1d'].event((project, 'pending'), +1)
586
        self._cnt['all'].event((project, 'pending'), +1)
587
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
588
        return task
589
590
    def on_old_request(self, task, old_task):
591
        '''Called when a crawled task is arrived'''
592
        now = time.time()
593
594
        _schedule = task.get('schedule', self.default_schedule)
595
        old_schedule = old_task.get('schedule', {})
596
597
        restart = False
598
        schedule_age = _schedule.get('age', self.default_schedule['age'])
599
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
600
            restart = True
601
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
602
            restart = True
603
        elif _schedule.get('force_update'):
604
            restart = True
605
606
        if not restart:
607
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
608
            return
609
610
        task['status'] = self.taskdb.ACTIVE
611
        self.update_task(task)
612
        self.put_task(task)
613
614
        project = task['project']
615
        if old_task['status'] != self.taskdb.ACTIVE:
616
            self._cnt['5m'].event((project, 'pending'), +1)
617
            self._cnt['1h'].event((project, 'pending'), +1)
618
            self._cnt['1d'].event((project, 'pending'), +1)
619
        if old_task['status'] == self.taskdb.SUCCESS:
620
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
621
        elif old_task['status'] == self.taskdb.FAILED:
622
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
623
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
624
        return task
625
626
    def on_task_status(self, task):
627
        '''Called when a status pack is arrived'''
628
        try:
629
            procesok = task['track']['process']['ok']
630
            if not self.task_queue[task['project']].done(task['taskid']):
631
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
632
                return None
633
        except KeyError as e:
634
            logger.error("Bad status pack: %s", e)
635
            return None
636
637
        if procesok:
638
            ret = self.on_task_done(task)
639
        else:
640
            ret = self.on_task_failed(task)
641
642
        if task['track']['fetch'].get('time'):
643
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
644
                                       task['track']['fetch']['time'])
645
        if task['track']['process'].get('time'):
646
            self._cnt['5m_time'].event((task['project'], 'process_time'),
647
                                       task['track']['process'].get('time'))
648
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
649
        return ret
650
651
    def on_task_done(self, task):
652
        '''Called when a task is done and success, called by `on_task_status`'''
653
        task['status'] = self.taskdb.SUCCESS
654
        task['lastcrawltime'] = time.time()
655
656
        if 'schedule' in task:
657
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
658
                task['status'] = self.taskdb.ACTIVE
659
                next_exetime = task['schedule'].get('age')
660
                task['schedule']['exetime'] = time.time() + next_exetime
661
                self.put_task(task)
662
            else:
663
                del task['schedule']
664
        self.update_task(task)
665
666
        project = task['project']
667
        self._cnt['5m'].event((project, 'success'), +1)
668
        self._cnt['1h'].event((project, 'success'), +1)
669
        self._cnt['1d'].event((project, 'success'), +1)
670
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
671
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
672
        return task
673
674
    def on_task_failed(self, task):
675
        '''Called when a task is failed, called by `on_task_status`'''
676
677
        if 'schedule' not in task:
678
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
679
            if old_task is None:
680
                logging.error('unknown status pack: %s' % task)
681
                return
682
            task['schedule'] = old_task.get('schedule', {})
683
684
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
685
        retried = task['schedule'].get('retried', 0)
686
687
        project_info = self.projects.get(task['project'], {})
688
        retry_delay = project_info.get('retry_delay', self.DEFAULT_RETRY_DELAY)
689
        next_exetime = retry_delay.get(retried, retry_delay[''])
690
691
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
692
            next_exetime = min(next_exetime, task['schedule'].get('age'))
693
        elif retried >= retries:
694
            next_exetime = -1
695
696
        if next_exetime < 0:
697
            task['status'] = self.taskdb.FAILED
698
            task['lastcrawltime'] = time.time()
699
            self.update_task(task)
700
701
            project = task['project']
702
            self._cnt['5m'].event((project, 'failed'), +1)
703
            self._cnt['1h'].event((project, 'failed'), +1)
704
            self._cnt['1d'].event((project, 'failed'), +1)
705
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
706
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
707
            return task
708
        else:
709
            task['schedule']['retried'] = retried + 1
710
            task['schedule']['exetime'] = time.time() + next_exetime
711
            task['lastcrawltime'] = time.time()
712
            self.update_task(task)
713
            self.put_task(task)
714
715
            project = task['project']
716
            self._cnt['5m'].event((project, 'retry'), +1)
717
            self._cnt['1h'].event((project, 'retry'), +1)
718
            self._cnt['1d'].event((project, 'retry'), +1)
719
            # self._cnt['all'].event((project, 'retry'), +1)
720
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
721
                retried, retries), task)
722
            return task
723
724
    def on_select_task(self, task):
725
        '''Called when a task is selected to fetch & process'''
726
        # inject informations about project
727
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
728
729
        project_info = self.projects.get(task['project'])
730
        assert project_info, 'no such project'
731
        task['group'] = project_info.get('group')
732
        task['project_md5sum'] = project_info.get('md5sum')
733
        task['project_updatetime'] = project_info.get('updatetime', 0)
734
        project_info['active_tasks'].appendleft((time.time(), task))
735
        self.send_task(task)
736
        return task
737
738
739
from tornado import gen
740
741
742
class OneScheduler(Scheduler):
743
    """
744
    Scheduler Mixin class for one mode
745
746
    overwirted send_task method
747
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
748
    """
749
750
    def _check_select(self):
751
        """
752
        interactive mode of select tasks
753
        """
754
        if not self.interactive:
755
            return super(OneScheduler, self)._check_select()
756
757
        # waiting for running tasks
758
        if self.running_task > 0:
759
            return
760
761
        is_crawled = []
762
763
        def run(project=None):
764
            return crawl('on_start', project=project)
765
766
        def crawl(url, project=None, **kwargs):
767
            """
768
            Crawl given url, same parameters as BaseHandler.crawl
769
770
            url - url or taskid, parameters will be used if in taskdb
771
            project - can be ignored if only one project exists.
772
            """
773
774
            # looking up the project instance
775
            if project is None:
776
                if len(self.projects) == 1:
777
                    project = list(self.projects.keys())[0]
778
                else:
779
                    raise LookupError('You need specify the project: %r'
780
                                      % list(self.projects.keys()))
781
            project_data = self.processor.project_manager.get(project)
782
            if not project_data:
783
                raise LookupError('no such project: %s' % project)
784
785
            # get task package
786
            instance = project_data['instance']
787
            instance._reset()
788
            task = instance.crawl(url, **kwargs)
789
            if isinstance(task, list):
790
                raise Exception('url list is not allowed in interactive mode')
791
792
            # check task in taskdb
793
            if not kwargs:
794
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
795
                                              fields=self.request_task_fields)
796
                if not dbtask:
797
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
798
                                                  fields=self.request_task_fields)
799
                if dbtask:
800
                    task = dbtask
801
802
            # select the task
803
            self.on_select_task(task)
804
            is_crawled.append(True)
805
806
            shell.ask_exit()
807
808
        def quit_interactive():
809
            '''Quit interactive mode'''
810
            is_crawled.append(True)
811
            self.interactive = False
812
            shell.ask_exit()
813
814
        def quit_pyspider():
815
            '''Close pyspider'''
816
            is_crawled[:] = []
817
            shell.ask_exit()
818
819
        shell = utils.get_python_console()
820
        shell.interact(
821
            'pyspider shell - Select task\n'
822
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
823
            'quit_interactive() - Quit interactive mode\n'
824
            'quit_pyspider() - Close pyspider'
825
        )
826
        if not is_crawled:
827
            self.ioloop.stop()
828
829
    def __getattr__(self, name):
830
        """patch for crawl(url, callback=self.index_page) API"""
831
        if self.interactive:
832
            return name
833
        raise AttributeError(name)
834
835
    def on_task_status(self, task):
836
        """Ignore not processing error in interactive mode"""
837
        if not self.interactive:
838
            super(OneScheduler, self).on_task_status(task)
839
840
        try:
841
            procesok = task['track']['process']['ok']
842
        except KeyError as e:
843
            logger.error("Bad status pack: %s", e)
844
            return None
845
846
        if procesok:
847
            ret = self.on_task_done(task)
848
        else:
849
            ret = self.on_task_failed(task)
850
        if task['track']['fetch'].get('time'):
851
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
852
                                       task['track']['fetch']['time'])
853
        if task['track']['process'].get('time'):
854
            self._cnt['5m_time'].event((task['project'], 'process_time'),
855
                                       task['track']['process'].get('time'))
856
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
857
        return ret
858
859
    def init_one(self, ioloop, fetcher, processor,
860
                 result_worker=None, interactive=False):
861
        self.ioloop = ioloop
862
        self.fetcher = fetcher
863
        self.processor = processor
864
        self.result_worker = result_worker
865
        self.interactive = interactive
866
        self.running_task = 0
867
868
    @gen.coroutine
869
    def do_task(self, task):
870
        self.running_task += 1
871
        result = yield gen.Task(self.fetcher.fetch, task)
872
        type, task, response = result.args
873
        self.processor.on_task(task, response)
874
        # do with message
875
        while not self.processor.inqueue.empty():
876
            _task, _response = self.processor.inqueue.get()
877
            self.processor.on_task(_task, _response)
878
        # do with results
879
        while not self.processor.result_queue.empty():
880
            _task, _result = self.processor.result_queue.get()
881
            if self.result_worker:
882
                self.result_worker.on_result(_task, _result)
883
        self.running_task -= 1
884
885
    def send_task(self, task, force=True):
886
        if self.fetcher.http_client.free_size() <= 0:
887
            if force:
888
                self._send_buffer.appendleft(task)
889
            else:
890
                raise self.outqueue.Full
891
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
892
893
    def run(self):
894
        import tornado.ioloop
895
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
896
                                        io_loop=self.ioloop).start()
897
        self.ioloop.start()
898
899
    def quit(self):
900
        self.ioloop.stop()
901
        logger.info("scheduler exiting...")
902