Completed
Push — master ( 24c5f2...b244b2 )
by Roy
01:41
created

pyspider.scheduler.Scheduler._update_project_cnt()   A

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 13
rs 9.4286
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import os
10
import json
11
import time
12
import logging
13
import itertools
14
from collections import deque
15
16
from six import iteritems, itervalues
17
18
from pyspider.libs import counter, utils
19
from six.moves import queue as Queue
20
from .task_queue import TaskQueue
21
logger = logging.getLogger('scheduler')
22
23
24
class Scheduler(object):
25
    UPDATE_PROJECT_INTERVAL = 5 * 60
26
    default_schedule = {
27
        'priority': 0,
28
        'retries': 3,
29
        'exetime': 0,
30
        'age': -1,
31
        'itag': None,
32
    }
33
    LOOP_LIMIT = 1000
34
    LOOP_INTERVAL = 0.1
35
    ACTIVE_TASKS = 100
36
    INQUEUE_LIMIT = 0
37
    EXCEPTION_LIMIT = 3
38
    DELETE_TIME = 24 * 60 * 60
39
    DEFAULT_RETRY_DELAY = {
40
        0: 30,
41
        1: 1*60*60,
42
        2: 6*60*60,
43
        3: 12*60*60,
44
        '': 24*60*60
45
    }
46
47
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
48
                 out_queue, data_path='./data', resultdb=None):
49
        self.taskdb = taskdb
50
        self.projectdb = projectdb
51
        self.resultdb = resultdb
52
        self.newtask_queue = newtask_queue
53
        self.status_queue = status_queue
54
        self.out_queue = out_queue
55
        self.data_path = data_path
56
57
        self._send_buffer = deque()
58
        self._quit = False
59
        self._exceptions = 0
60
        self.projects = dict()
61
        self._force_update_project = False
62
        self._last_update_project = 0
63
        self.task_queue = dict()
64
        self._last_tick = int(time.time())
65
66
        self._cnt = {
67
            "5m_time": counter.CounterManager(
68
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
69
            "5m": counter.CounterManager(
70
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
71
            "1h": counter.CounterManager(
72
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
73
            "1d": counter.CounterManager(
74
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
75
            "all": counter.CounterManager(
76
                lambda: counter.TotalCounter()),
77
        }
78
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
79
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
80
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
81
        self._last_dump_cnt = 0
82
83
    def _update_projects(self):
84
        '''Check project update'''
85
        now = time.time()
86
        if (
87
                not self._force_update_project
88
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
89
        ):
90
            return
91
        for project in self.projectdb.check_update(self._last_update_project):
92
            self._update_project(project)
93
            logger.debug("project: %s updated.", project['name'])
94
        self._force_update_project = False
95
        self._last_update_project = now
96
97
    def _update_project(self, project):
98
        '''update one project'''
99
        if project['name'] not in self.projects:
100
            self.projects[project['name']] = {}
101
        self.projects[project['name']].update(project)
102
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
103
        if not self.projects[project['name']].get('active_tasks', None):
104
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
105
106
        # load task queue when project is running and delete task_queue when project is stoped
107
        if project['status'] in ('RUNNING', 'DEBUG'):
108
            if project['name'] not in self.task_queue:
109
                self._load_tasks(project['name'])
110
            self.task_queue[project['name']].rate = project['rate']
111
            self.task_queue[project['name']].burst = project['burst']
112
113
            # update project runtime info from processor by sending a _on_get_info
114
            # request, result is in status_page.track.save
115
            self.on_select_task({
116
                'taskid': '_on_get_info',
117
                'project': project['name'],
118
                'url': 'data:,_on_get_info',
119
                'status': self.taskdb.SUCCESS,
120
                'fetch': {
121
                    'save': ['min_tick', 'retry_delay'],
122
                },
123
                'process': {
124
                    'callback': '_on_get_info',
125
                },
126
            })
127
        else:
128
            if project['name'] in self.task_queue:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
129
                self.task_queue[project['name']].rate = 0
130
                self.task_queue[project['name']].burst = 0
131
                del self.task_queue[project['name']]
132
133
            if project not in self._cnt['all']:
134
                self._update_project_cnt(project['name'])
135
136
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
137
138
    def _load_tasks(self, project):
139
        '''load tasks from database'''
140
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
141
        for task in self.taskdb.load_tasks(
142
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
143
        ):
144
            taskid = task['taskid']
145
            _schedule = task.get('schedule', self.default_schedule)
146
            priority = _schedule.get('priority', self.default_schedule['priority'])
147
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
148
            self.task_queue[project].put(taskid, priority, exetime)
149
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
150
151
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
152
            self.task_queue[project].rate = self.projects[project]['rate']
153
            self.task_queue[project].burst = self.projects[project]['burst']
154
        else:
155
            self.task_queue[project].rate = 0
156
            self.task_queue[project].burst = 0
157
158
        if project not in self._cnt['all']:
159
            self._update_project_cnt(project)
160
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
161
162
    def _update_project_cnt(self, project):
163
        status_count = self.taskdb.status_count(project)
164
        self._cnt['all'].value(
165
            (project, 'success'),
166
            status_count.get(self.taskdb.SUCCESS, 0)
167
        )
168
        self._cnt['all'].value(
169
            (project, 'failed'),
170
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
171
        )
172
        self._cnt['all'].value(
173
            (project, 'pending'),
174
            status_count.get(self.taskdb.ACTIVE, 0)
175
        )
176
177
    def task_verify(self, task):
178
        '''
179
        return False if any of 'taskid', 'project', 'url' is not in task dict
180
                        or project in not in task_queue
181
        '''
182
        for each in ('taskid', 'project', 'url', ):
183
            if each not in task or not task[each]:
184
                logger.error('%s not in task: %.200r', each, task)
185
                return False
186
        if task['project'] not in self.task_queue:
187
            logger.error('unknown project: %s', task['project'])
188
            return False
189
        return True
190
191
    def insert_task(self, task):
192
        '''insert task into database'''
193
        return self.taskdb.insert(task['project'], task['taskid'], task)
194
195
    def update_task(self, task):
196
        '''update task in database'''
197
        return self.taskdb.update(task['project'], task['taskid'], task)
198
199
    def put_task(self, task):
200
        '''put task to task queue'''
201
        _schedule = task.get('schedule', self.default_schedule)
202
        self.task_queue[task['project']].put(
203
            task['taskid'],
204
            priority=_schedule.get('priority', self.default_schedule['priority']),
205
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
206
        )
207
208
    def send_task(self, task, force=True):
209
        '''
210
        dispatch task to fetcher
211
212
        out queue may have size limit to prevent block, a send_buffer is used
213
        '''
214
        try:
215
            self.out_queue.put_nowait(task)
216
        except Queue.Full:
217
            if force:
218
                self._send_buffer.appendleft(task)
219
            else:
220
                raise
221
222
    def _check_task_done(self):
223
        '''Check status queue'''
224
        cnt = 0
225
        try:
226
            while True:
227
                task = self.status_queue.get_nowait()
228
                # check _on_get_info result here
229
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
230
                    self.projects[task['project']].update(task['track'].get('save') or {})
231
                    logger.info(
232
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
233
                    )
234
                    continue
235
                elif not self.task_verify(task):
236
                    continue
237
                self.on_task_status(task)
238
                cnt += 1
239
        except Queue.Empty:
240
            pass
241
        return cnt
242
243
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
244
245
    def _check_request(self):
246
        '''Check new task queue'''
247
        tasks = {}
248
        while len(tasks) < self.LOOP_LIMIT:
249
            try:
250
                task = self.newtask_queue.get_nowait()
251
            except Queue.Empty:
252
                break
253
254
            if isinstance(task, list):
255
                _tasks = task
256
            else:
257
                _tasks = (task, )
258
259
            for task in _tasks:
260
                if not self.task_verify(task):
261
                    continue
262
263
                if task['taskid'] in self.task_queue[task['project']]:
264
                    if not task.get('schedule', {}).get('force_update', False):
265
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
266
                        continue
267
268
                if task['taskid'] in tasks:
269
                    if not task.get('schedule', {}).get('force_update', False):
270
                        continue
271
272
                tasks[task['taskid']] = task
273
274
        for task in itervalues(tasks):
275
            if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
276
                logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
277
                continue
278
279
            oldtask = self.taskdb.get_task(task['project'], task['taskid'],
280
                                           fields=self.merge_task_fields)
281
            if oldtask:
282
                task = self.on_old_request(task, oldtask)
283
            else:
284
                task = self.on_new_request(task)
285
286
        return len(tasks)
287
288
    def _check_cronjob(self):
289
        """Check projects cronjob tick, return True when a new tick is sended"""
290
        now = time.time()
291
        self._last_tick = int(self._last_tick)
292
        if now - self._last_tick < 1:
293
            return False
294
        self._last_tick += 1
295
        for project in itervalues(self.projects):
296
            if project['status'] not in ('DEBUG', 'RUNNING'):
297
                continue
298
            if project.get('min_tick', 0) == 0:
299
                continue
300
            if self._last_tick % int(project['min_tick']) != 0:
301
                continue
302
            self.on_select_task({
303
                'taskid': '_on_cronjob',
304
                'project': project['name'],
305
                'url': 'data:,_on_cronjob',
306
                'status': self.taskdb.SUCCESS,
307
                'fetch': {
308
                    'save': {
309
                        'tick': self._last_tick,
310
                    },
311
                },
312
                'process': {
313
                    'callback': '_on_cronjob',
314
                },
315
            })
316
        return True
317
318
    request_task_fields = [
319
        'taskid',
320
        'project',
321
        'url',
322
        'status',
323
        'schedule',
324
        'fetch',
325
        'process',
326
        'track',
327
        'lastcrawltime'
328
    ]
329
330
    def _check_select(self):
331
        '''Select task to fetch & process'''
332
        while self._send_buffer:
333
            _task = self._send_buffer.pop()
334
            try:
335
                # use force=False here to prevent automatic send_buffer append and get exception
336
                self.send_task(_task, False)
337
            except Queue.Full:
338
                self._send_buffer.append(_task)
339
                break
340
341
        if self.out_queue.full():
342
            return {}
343
344
        taskids = []
345
        cnt = 0
346
        cnt_dict = dict()
347
        limit = self.LOOP_LIMIT
348
        for project, task_queue in iteritems(self.task_queue):
349
            if cnt >= limit:
350
                break
351
352
            # task queue
353
            self.task_queue[project].check_update()
354
            project_cnt = 0
355
356
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
357
            while cnt < limit and project_cnt < limit / 10:
358
                taskid = task_queue.get()
359
                if not taskid:
360
                    break
361
362
                taskids.append((project, taskid))
363
                project_cnt += 1
364
                cnt += 1
365
            cnt_dict[project] = project_cnt
366
367
        for project, taskid in taskids:
368
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
369
            if not task:
370
                continue
371
            task = self.on_select_task(task)
372
373
        return cnt_dict
374
375
    def _print_counter_log(self):
376
        # print top 5 active counters
377
        keywords = ('pending', 'success', 'retry', 'failed')
378
        total_cnt = {}
379
        project_actives = []
380
        project_fails = []
381
        for key in keywords:
382
            total_cnt[key] = 0
383
        for project, subcounter in iteritems(self._cnt['5m']):
384
            actives = 0
385
            for key in keywords:
386
                cnt = subcounter.get(key, None)
387
                if cnt:
388
                    cnt = cnt.sum
389
                    total_cnt[key] += cnt
390
                    actives += cnt
391
392
            project_actives.append((actives, project))
393
394
            fails = subcounter.get('failed', None)
395
            if fails:
396
                project_fails.append((fails.sum, project))
397
398
        top_2_fails = sorted(project_fails, reverse=True)[:2]
399
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
400
                               reverse=True)[:5 - len(top_2_fails)]
401
402
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
403
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
404
        for _, project in itertools.chain(top_3_actives, top_2_fails):
405
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
406
            log_str += " %s:%d,%d,%d,%d" % (project,
407
                                            subcounter.get('pending', 0),
408
                                            subcounter.get('success', 0),
409
                                            subcounter.get('retry', 0),
410
                                            subcounter.get('failed', 0))
411
        logger.info(log_str)
412
413
    def _dump_cnt(self):
414
        '''Dump counters to file'''
415
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
416
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
417
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
418
419
    def _try_dump_cnt(self):
420
        '''Dump counters every 60 seconds'''
421
        now = time.time()
422
        if now - self._last_dump_cnt > 60:
423
            self._last_dump_cnt = now
424
            self._dump_cnt()
425
            self._print_counter_log()
426
427
    def _check_delete(self):
428
        '''Check project delete'''
429
        now = time.time()
430
        for project in list(itervalues(self.projects)):
431
            if project['status'] != 'STOP':
432
                continue
433
            if now - project['updatetime'] < self.DELETE_TIME:
434
                continue
435
            if 'delete' not in self.projectdb.split_group(project['group']):
436
                continue
437
438
            logger.warning("deleting project: %s!", project['name'])
439
            if project['name'] in self.task_queue:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
440
                self.task_queue[project['name']].rate = 0
441
                self.task_queue[project['name']].burst = 0
442
                del self.task_queue[project['name']]
443
            del self.projects[project['name']]
444
            self.taskdb.drop(project['name'])
445
            self.projectdb.drop(project['name'])
446
            if self.resultdb:
447
                self.resultdb.drop(project['name'])
448
449
    def __len__(self):
450
        return sum(len(x) for x in itervalues(self.task_queue))
451
452
    def quit(self):
453
        '''Set quit signal'''
454
        self._quit = True
455
456
    def run_once(self):
457
        '''comsume queues and feed tasks to fetcher, once'''
458
459
        self._update_projects()
460
        self._check_task_done()
461
        self._check_request()
462
        while self._check_cronjob():
463
            pass
464
        self._check_select()
465
        self._check_delete()
466
        self._try_dump_cnt()
467
468
    def run(self):
469
        '''Start scheduler loop'''
470
        logger.info("loading projects")
471
472
        while not self._quit:
473
            try:
474
                time.sleep(self.LOOP_INTERVAL)
475
                self.run_once()
476
                self._exceptions = 0
477
            except KeyboardInterrupt:
478
                break
479
            except Exception as e:
480
                logger.exception(e)
481
                self._exceptions += 1
482
                if self._exceptions > self.EXCEPTION_LIMIT:
483
                    break
484
                continue
485
486
        logger.info("scheduler exiting...")
487
        self._dump_cnt()
488
489
    def trigger_on_start(self, project):
490
        '''trigger an on_start callback of project'''
491
        self.newtask_queue.put({
492
            "project": project,
493
            "taskid": "on_start",
494
            "url": "data:,on_start",
495
            "process": {
496
                "callback": "on_start",
497
            },
498
        })
499
500
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
501
        '''Start xmlrpc interface'''
502
        try:
503
            from six.moves.xmlrpc_server import SimpleXMLRPCServer
504
        except ImportError:
505
            from SimpleXMLRPCServer import SimpleXMLRPCServer
506
507
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
508
        server.register_introspection_functions()
509
        server.register_multicall_functions()
510
511
        server.register_function(self.quit, '_quit')
512
        server.register_function(self.__len__, 'size')
513
514
        def dump_counter(_time, _type):
515
            try:
516
                return self._cnt[_time].to_dict(_type)
517
            except:
518
                logger.exception('')
519
        server.register_function(dump_counter, 'counter')
520
521
        def new_task(task):
522
            if self.task_verify(task):
523
                self.newtask_queue.put(task)
524
                return True
525
            return False
526
        server.register_function(new_task, 'newtask')
527
528
        def send_task(task):
529
            '''dispatch task to fetcher'''
530
            self.send_task(task)
531
            return True
532
        server.register_function(send_task, 'send_task')
533
534
        def update_project():
535
            self._force_update_project = True
536
        server.register_function(update_project, 'update_project')
537
538
        def get_active_tasks(project=None, limit=100):
539
            allowed_keys = set((
540
                'taskid',
541
                'project',
542
                'status',
543
                'url',
544
                'lastcrawltime',
545
                'updatetime',
546
                'track',
547
            ))
548
            track_allowed_keys = set((
549
                'ok',
550
                'time',
551
                'follows',
552
                'status_code',
553
            ))
554
555
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
556
                     if x and (k == project if project else True)]
557
            tasks = [next(x, None) for x in iters]
558
            result = []
559
560
            while len(result) < limit and tasks and not all(x is None for x in tasks):
561
                updatetime, task = t = max(t for t in tasks if t)
562
                i = tasks.index(t)
563
                tasks[i] = next(iters[i], None)
564
                for key in list(task):
565
                    if key == 'track':
566
                        for k in list(task[key].get('fetch', [])):
567
                            if k not in track_allowed_keys:
568
                                del task[key]['fetch'][k]
569
                        for k in list(task[key].get('process', [])):
570
                            if k not in track_allowed_keys:
571
                                del task[key]['process'][k]
572
                    if key in allowed_keys:
573
                        continue
574
                    del task[key]
575
                result.append(t)
576
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
577
            # have no idea why
578
            return json.loads(json.dumps(result))
579
        server.register_function(get_active_tasks, 'get_active_tasks')
580
581
        server.timeout = 0.5
582
        while not self._quit:
583
            server.handle_request()
584
        server.server_close()
585
586
    def on_new_request(self, task):
587
        '''Called when a new request is arrived'''
588
        task['status'] = self.taskdb.ACTIVE
589
        self.insert_task(task)
590
        self.put_task(task)
591
592
        project = task['project']
593
        self._cnt['5m'].event((project, 'pending'), +1)
594
        self._cnt['1h'].event((project, 'pending'), +1)
595
        self._cnt['1d'].event((project, 'pending'), +1)
596
        self._cnt['all'].event((project, 'pending'), +1)
597
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
598
        return task
599
600
    def on_old_request(self, task, old_task):
601
        '''Called when a crawled task is arrived'''
602
        now = time.time()
603
604
        _schedule = task.get('schedule', self.default_schedule)
605
        old_schedule = old_task.get('schedule', {})
606
607
        restart = False
608
        schedule_age = _schedule.get('age', self.default_schedule['age'])
609
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
610
            restart = True
611
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
612
            restart = True
613
        elif _schedule.get('force_update'):
614
            restart = True
615
616
        if not restart:
617
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
618
            return
619
620
        task['status'] = self.taskdb.ACTIVE
621
        self.update_task(task)
622
        self.put_task(task)
623
624
        project = task['project']
625
        if old_task['status'] != self.taskdb.ACTIVE:
626
            self._cnt['5m'].event((project, 'pending'), +1)
627
            self._cnt['1h'].event((project, 'pending'), +1)
628
            self._cnt['1d'].event((project, 'pending'), +1)
629
        if old_task['status'] == self.taskdb.SUCCESS:
630
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
631
        elif old_task['status'] == self.taskdb.FAILED:
632
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
633
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
634
        return task
635
636
    def on_task_status(self, task):
637
        '''Called when a status pack is arrived'''
638
        try:
639
            procesok = task['track']['process']['ok']
640
            if not self.task_queue[task['project']].done(task['taskid']):
641
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
642
                return None
643
        except KeyError as e:
644
            logger.error("Bad status pack: %s", e)
645
            return None
646
647
        if procesok:
648
            ret = self.on_task_done(task)
649
        else:
650
            ret = self.on_task_failed(task)
651
652
        if task['track']['fetch'].get('time'):
653
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
654
                                       task['track']['fetch']['time'])
655
        if task['track']['process'].get('time'):
656
            self._cnt['5m_time'].event((task['project'], 'process_time'),
657
                                       task['track']['process'].get('time'))
658
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
659
        return ret
660
661
    def on_task_done(self, task):
662
        '''Called when a task is done and success, called by `on_task_status`'''
663
        task['status'] = self.taskdb.SUCCESS
664
        task['lastcrawltime'] = time.time()
665
666
        if 'schedule' in task:
667
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
668
                task['status'] = self.taskdb.ACTIVE
669
                next_exetime = task['schedule'].get('age')
670
                task['schedule']['exetime'] = time.time() + next_exetime
671
                self.put_task(task)
672
            else:
673
                del task['schedule']
674
        self.update_task(task)
675
676
        project = task['project']
677
        self._cnt['5m'].event((project, 'success'), +1)
678
        self._cnt['1h'].event((project, 'success'), +1)
679
        self._cnt['1d'].event((project, 'success'), +1)
680
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
681
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
682
        return task
683
684
    def on_task_failed(self, task):
685
        '''Called when a task is failed, called by `on_task_status`'''
686
687
        if 'schedule' not in task:
688
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
689
            if old_task is None:
690
                logging.error('unknown status pack: %s' % task)
691
                return
692
            task['schedule'] = old_task.get('schedule', {})
693
694
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
695
        retried = task['schedule'].get('retried', 0)
696
697
        project_info = self.projects.get(task['project'], {})
698
        retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY
699
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
700
701
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
702
            next_exetime = min(next_exetime, task['schedule'].get('age'))
703
        else:
704
            if retried >= retries:
705
                next_exetime = -1
706
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
707
                next_exetime = task['schedule'].get('age')
708
709
        if next_exetime < 0:
710
            task['status'] = self.taskdb.FAILED
711
            task['lastcrawltime'] = time.time()
712
            self.update_task(task)
713
714
            project = task['project']
715
            self._cnt['5m'].event((project, 'failed'), +1)
716
            self._cnt['1h'].event((project, 'failed'), +1)
717
            self._cnt['1d'].event((project, 'failed'), +1)
718
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
719
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
720
            return task
721
        else:
722
            task['schedule']['retried'] = retried + 1
723
            task['schedule']['exetime'] = time.time() + next_exetime
724
            task['lastcrawltime'] = time.time()
725
            self.update_task(task)
726
            self.put_task(task)
727
728
            project = task['project']
729
            self._cnt['5m'].event((project, 'retry'), +1)
730
            self._cnt['1h'].event((project, 'retry'), +1)
731
            self._cnt['1d'].event((project, 'retry'), +1)
732
            # self._cnt['all'].event((project, 'retry'), +1)
733
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
734
                retried, retries), task)
735
            return task
736
737
    def on_select_task(self, task):
738
        '''Called when a task is selected to fetch & process'''
739
        # inject informations about project
740
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
741
742
        project_info = self.projects.get(task['project'])
743
        assert project_info, 'no such project'
744
        task['group'] = project_info.get('group')
745
        task['project_md5sum'] = project_info.get('md5sum')
746
        task['project_updatetime'] = project_info.get('updatetime', 0)
747
        project_info['active_tasks'].appendleft((time.time(), task))
748
        self.send_task(task)
749
        return task
750
751
752
from tornado import gen
753
754
755
class OneScheduler(Scheduler):
756
    """
757
    Scheduler Mixin class for one mode
758
759
    overwirted send_task method
760
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
761
    """
762
763
    def _check_select(self):
764
        """
765
        interactive mode of select tasks
766
        """
767
        if not self.interactive:
768
            return super(OneScheduler, self)._check_select()
769
770
        # waiting for running tasks
771
        if self.running_task > 0:
772
            return
773
774
        is_crawled = []
775
776
        def run(project=None):
777
            return crawl('on_start', project=project)
778
779
        def crawl(url, project=None, **kwargs):
780
            """
781
            Crawl given url, same parameters as BaseHandler.crawl
782
783
            url - url or taskid, parameters will be used if in taskdb
784
            project - can be ignored if only one project exists.
785
            """
786
787
            # looking up the project instance
788
            if project is None:
789
                if len(self.projects) == 1:
790
                    project = list(self.projects.keys())[0]
791
                else:
792
                    raise LookupError('You need specify the project: %r'
793
                                      % list(self.projects.keys()))
794
            project_data = self.processor.project_manager.get(project)
795
            if not project_data:
796
                raise LookupError('no such project: %s' % project)
797
798
            # get task package
799
            instance = project_data['instance']
800
            instance._reset()
801
            task = instance.crawl(url, **kwargs)
802
            if isinstance(task, list):
803
                raise Exception('url list is not allowed in interactive mode')
804
805
            # check task in taskdb
806
            if not kwargs:
807
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
808
                                              fields=self.request_task_fields)
809
                if not dbtask:
810
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
811
                                                  fields=self.request_task_fields)
812
                if dbtask:
813
                    task = dbtask
814
815
            # select the task
816
            self.on_select_task(task)
817
            is_crawled.append(True)
818
819
            shell.ask_exit()
820
821
        def quit_interactive():
822
            '''Quit interactive mode'''
823
            is_crawled.append(True)
824
            self.interactive = False
825
            shell.ask_exit()
826
827
        def quit_pyspider():
828
            '''Close pyspider'''
829
            is_crawled[:] = []
830
            shell.ask_exit()
831
832
        shell = utils.get_python_console()
833
        shell.interact(
834
            'pyspider shell - Select task\n'
835
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
836
            'quit_interactive() - Quit interactive mode\n'
837
            'quit_pyspider() - Close pyspider'
838
        )
839
        if not is_crawled:
840
            self.ioloop.stop()
841
842
    def __getattr__(self, name):
843
        """patch for crawl(url, callback=self.index_page) API"""
844
        if self.interactive:
845
            return name
846
        raise AttributeError(name)
847
848
    def on_task_status(self, task):
849
        """Ignore not processing error in interactive mode"""
850
        if not self.interactive:
851
            super(OneScheduler, self).on_task_status(task)
852
853
        try:
854
            procesok = task['track']['process']['ok']
855
        except KeyError as e:
856
            logger.error("Bad status pack: %s", e)
857
            return None
858
859
        if procesok:
860
            ret = self.on_task_done(task)
861
        else:
862
            ret = self.on_task_failed(task)
863
        if task['track']['fetch'].get('time'):
864
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
865
                                       task['track']['fetch']['time'])
866
        if task['track']['process'].get('time'):
867
            self._cnt['5m_time'].event((task['project'], 'process_time'),
868
                                       task['track']['process'].get('time'))
869
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
870
        return ret
871
872
    def init_one(self, ioloop, fetcher, processor,
873
                 result_worker=None, interactive=False):
874
        self.ioloop = ioloop
875
        self.fetcher = fetcher
876
        self.processor = processor
877
        self.result_worker = result_worker
878
        self.interactive = interactive
879
        self.running_task = 0
880
881
    @gen.coroutine
882
    def do_task(self, task):
883
        self.running_task += 1
884
        result = yield gen.Task(self.fetcher.fetch, task)
885
        type, task, response = result.args
886
        self.processor.on_task(task, response)
887
        # do with message
888
        while not self.processor.inqueue.empty():
889
            _task, _response = self.processor.inqueue.get()
890
            self.processor.on_task(_task, _response)
891
        # do with results
892
        while not self.processor.result_queue.empty():
893
            _task, _result = self.processor.result_queue.get()
894
            if self.result_worker:
895
                self.result_worker.on_result(_task, _result)
896
        self.running_task -= 1
897
898
    def send_task(self, task, force=True):
899
        if self.fetcher.http_client.free_size() <= 0:
900
            if force:
901
                self._send_buffer.appendleft(task)
902
            else:
903
                raise self.outqueue.Full
904
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
905
906
    def run(self):
907
        import tornado.ioloop
908
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
909
                                        io_loop=self.ioloop).start()
910
        self.ioloop.start()
911
912
    def quit(self):
913
        self.ioloop.stop()
914
        logger.info("scheduler exiting...")
915