Completed
Push — master ( 05f124...f2ad1f )
by Roy
01:15
created

Scheduler.task_verify()   B

Complexity

Conditions 6

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 19
rs 8
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import itertools
10
import json
11
import logging
12
import os
13
import time
14
from collections import deque
15
16
from six import iteritems, itervalues
17
from six.moves import queue as Queue
18
19
from pyspider.libs import counter, utils
20
from pyspider.libs.base_handler import BaseHandler
21
from .task_queue import TaskQueue
22
23
logger = logging.getLogger('scheduler')
24
25
26
class Project(object):
27
    '''
28
    project for scheduler
29
    '''
30
    def __init__(self, scheduler, project_info):
31
        '''
32
        '''
33
        self.scheduler = scheduler
34
35
        self.active_tasks = deque(maxlen=scheduler.ACTIVE_TASKS)
36
        self.task_queue = TaskQueue()
37
        self.task_loaded = False
38
        self._send_finished_event = False
39
40
        self.md5sum = None
41
        self._send_on_get_info = False
42
        self.waiting_get_info = True
43
44
        self._paused = False
45
        self._paused_time = 0
46
        self._unpause_last_seen = None
47
48
        self.update(project_info)
49
50
    @property
51
    def paused(self):
52
        # unpaused --(last FAIL_PAUSE_NUM task failed)--> paused --(PAUSE_TIME)--> unpause_checking
53
        #                         unpaused <--(last UNPAUSE_CHECK_NUM task have success)--|
54
        #                             paused <--(last UNPAUSE_CHECK_NUM task no success)--|
55
        if not self._paused:
56
            fail_cnt = 0
57
            for _, task in self.active_tasks:
58
                # ignore select task
59
                if task.get('type') == self.scheduler.TASK_PACK:
60
                    continue
61
                if 'process' not in task['track']:
62
                    logger.error('process not in task, %r', task)
63
                if task['track']['process']['ok']:
64
                    break
65
                else:
66
                    fail_cnt += 1
67
                if fail_cnt >= self.scheduler.FAIL_PAUSE_NUM:
68
                    break
69
            if fail_cnt >= self.scheduler.FAIL_PAUSE_NUM:
70
                self._paused = True
71
                self._paused_time = time.time()
72
        elif self._paused is True and (self._paused_time + self.scheduler.PAUSE_TIME < time.time()):
73
            self._paused = 'checking'
74
            self._unpause_last_seen = self.active_tasks[0][1] if len(self.active_tasks) else None
75
        elif self._paused == 'checking':
76
            cnt = 0
77
            fail_cnt = 0
78
            for _, task in self.active_tasks:
79
                if task is self._unpause_last_seen:
80
                    break
81
                # ignore select task
82
                if task.get('type') == self.scheduler.TASK_PACK:
83
                    continue
84
                cnt += 1
85
                if task['track']['process']['ok']:
86
                    # break with enough check cnt
87
                    cnt = self.scheduler.UNPAUSE_CHECK_NUM
88
                    break
89
                else:
90
                    fail_cnt += 1
91
            if cnt >= self.scheduler.UNPAUSE_CHECK_NUM:
92
                if fail_cnt == cnt:
93
                    self._paused = True
94
                    self._paused_time = time.time()
95
                else:
96
                    self._paused = False
97
98
        return self._paused is True
99
100
    def update(self, project_info):
101
        self.project_info = project_info
102
103
        self.name = project_info['name']
104
        self.group = project_info['group']
105
        self.db_status = project_info['status']
106
        self.updatetime = project_info['updatetime']
107
108
        md5sum = utils.md5string(project_info['script'])
109
        if (self.md5sum != md5sum or self.waiting_get_info) and self.active:
110
            self._send_on_get_info = True
111
            self.waiting_get_info = True
112
        self.md5sum = md5sum
113
114
        if self.active:
115
            self.task_queue.rate = project_info['rate']
116
            self.task_queue.burst = project_info['burst']
117
        else:
118
            self.task_queue.rate = 0
119
            self.task_queue.burst = 0
120
121
        logger.info('project %s updated, status:%s, paused:%s, %d tasks',
122
                    self.name, self.db_status, self.paused, len(self.task_queue))
123
124
    def on_get_info(self, info):
125
        self.waiting_get_info = False
126
        self.min_tick = info.get('min_tick', 0)
127
        self.retry_delay = info.get('retry_delay', {})
128
        self.crawl_config = info.get('crawl_config', {})
129
130
    @property
131
    def active(self):
132
        return self.db_status in ('RUNNING', 'DEBUG')
133
134
135
class Scheduler(object):
136
    UPDATE_PROJECT_INTERVAL = 5 * 60
137
    default_schedule = {
138
        'priority': 0,
139
        'retries': 3,
140
        'exetime': 0,
141
        'age': -1,
142
        'itag': None,
143
    }
144
    LOOP_LIMIT = 1000
145
    LOOP_INTERVAL = 0.1
146
    ACTIVE_TASKS = 100
147
    INQUEUE_LIMIT = 0
148
    EXCEPTION_LIMIT = 3
149
    DELETE_TIME = 24 * 60 * 60
150
    DEFAULT_RETRY_DELAY = {
151
        0: 30,
152
        1: 1*60*60,
153
        2: 6*60*60,
154
        3: 12*60*60,
155
        '': 24*60*60
156
    }
157
    FAIL_PAUSE_NUM = 10
158
    PAUSE_TIME = 5*60
159
    UNPAUSE_CHECK_NUM = 3
160
161
    TASK_PACK = 1
162
    STATUS_PACK = 2  # current not used
163
    REQUEST_PACK = 3  # current not used
164
165
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
166
                 out_queue, data_path='./data', resultdb=None):
167
        self.taskdb = taskdb
168
        self.projectdb = projectdb
169
        self.resultdb = resultdb
170
        self.newtask_queue = newtask_queue
171
        self.status_queue = status_queue
172
        self.out_queue = out_queue
173
        self.data_path = data_path
174
175
        self._send_buffer = deque()
176
        self._quit = False
177
        self._exceptions = 0
178
        self.projects = dict()
179
        self._force_update_project = False
180
        self._last_update_project = 0
181
        self._last_tick = int(time.time())
182
        self._postpone_request = []
183
184
        self._cnt = {
185
            "5m_time": counter.CounterManager(
186
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
187
            "5m": counter.CounterManager(
188
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
189
            "1h": counter.CounterManager(
190
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
191
            "1d": counter.CounterManager(
192
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
193
            "all": counter.CounterManager(
194
                lambda: counter.TotalCounter()),
195
        }
196
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
197
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
198
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
199
        self._last_dump_cnt = 0
200
201
    def _update_projects(self):
202
        '''Check project update'''
203
        now = time.time()
204
        if (
205
                not self._force_update_project
206
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
207
        ):
208
            return
209
        for project in self.projectdb.check_update(self._last_update_project):
210
            self._update_project(project)
211
            logger.debug("project: %s updated.", project['name'])
212
        self._force_update_project = False
213
        self._last_update_project = now
214
215
    get_info_attributes = ['min_tick', 'retry_delay', 'crawl_config']
216
217
    def _update_project(self, project):
218
        '''update one project'''
219
        if project['name'] not in self.projects:
220
            self.projects[project['name']] = Project(self, project)
221
        else:
222
            self.projects[project['name']].update(project)
223
224
        project = self.projects[project['name']]
225
226
        if project._send_on_get_info:
227
            # update project runtime info from processor by sending a _on_get_info
228
            # request, result is in status_page.track.save
229
            project._send_on_get_info = False
230
            self.on_select_task({
231
                'taskid': '_on_get_info',
232
                'project': project.name,
233
                'url': 'data:,_on_get_info',
234
                'status': self.taskdb.SUCCESS,
235
                'fetch': {
236
                    'save': self.get_info_attributes,
237
                },
238
                'process': {
239
                    'callback': '_on_get_info',
240
                },
241
            })
242
243
        # load task queue when project is running and delete task_queue when project is stoped
244
        if project.active:
245
            if not project.task_loaded:
246
                self._load_tasks(project)
247
                project.task_loaded = True
248
        else:
249
            if project.task_loaded:
250
                project.task_queue = TaskQueue()
251
                project.task_loaded = False
252
253
            if project not in self._cnt['all']:
254
                self._update_project_cnt(project.name)
255
256
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
257
258
    def _load_tasks(self, project):
259
        '''load tasks from database'''
260
        task_queue = project.task_queue
261
262
        for task in self.taskdb.load_tasks(
263
                self.taskdb.ACTIVE, project.name, self.scheduler_task_fields
264
        ):
265
            taskid = task['taskid']
266
            _schedule = task.get('schedule', self.default_schedule)
267
            priority = _schedule.get('priority', self.default_schedule['priority'])
268
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
269
            task_queue.put(taskid, priority, exetime)
270
        project.task_loaded = True
271
        logger.debug('project: %s loaded %d tasks.', project.name, len(task_queue))
272
273
        if project not in self._cnt['all']:
274
            self._update_project_cnt(project)
275
        self._cnt['all'].value((project.name, 'pending'), len(project.task_queue))
276
277
    def _update_project_cnt(self, project_name):
278
        status_count = self.taskdb.status_count(project_name)
279
        self._cnt['all'].value(
280
            (project_name, 'success'),
281
            status_count.get(self.taskdb.SUCCESS, 0)
282
        )
283
        self._cnt['all'].value(
284
            (project_name, 'failed'),
285
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
286
        )
287
        self._cnt['all'].value(
288
            (project_name, 'pending'),
289
            status_count.get(self.taskdb.ACTIVE, 0)
290
        )
291
292
    def task_verify(self, task):
293
        '''
294
        return False if any of 'taskid', 'project', 'url' is not in task dict
295
                        or project in not in task_queue
296
        '''
297
        for each in ('taskid', 'project', 'url', ):
298
            if each not in task or not task[each]:
299
                logger.error('%s not in task: %.200r', each, task)
300
                return False
301
        if task['project'] not in self.projects:
302
            logger.error('unknown project: %s', task['project'])
303
            return False
304
305
        project = self.projects[task['project']]
306
        if not project.active:
307
            logger.error('project %s not started, please set status to RUNNING or DEBUG',
308
                         task['project'])
309
            return False
310
        return True
311
312
    def insert_task(self, task):
313
        '''insert task into database'''
314
        return self.taskdb.insert(task['project'], task['taskid'], task)
315
316
    def update_task(self, task):
317
        '''update task in database'''
318
        return self.taskdb.update(task['project'], task['taskid'], task)
319
320
    def put_task(self, task):
321
        '''put task to task queue'''
322
        _schedule = task.get('schedule', self.default_schedule)
323
        self.projects[task['project']].task_queue.put(
324
            task['taskid'],
325
            priority=_schedule.get('priority', self.default_schedule['priority']),
326
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
327
        )
328
329
    def send_task(self, task, force=True):
330
        '''
331
        dispatch task to fetcher
332
333
        out queue may have size limit to prevent block, a send_buffer is used
334
        '''
335
        try:
336
            self.out_queue.put_nowait(task)
337
        except Queue.Full:
338
            if force:
339
                self._send_buffer.appendleft(task)
340
            else:
341
                raise
342
343
    def _check_task_done(self):
344
        '''Check status queue'''
345
        cnt = 0
346
        try:
347
            while True:
348
                task = self.status_queue.get_nowait()
349
                # check _on_get_info result here
350
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
351
                    if task['project'] not in self.projects:
352
                        continue
353
                    project = self.projects[task['project']]
354
                    project.on_get_info(task['track'].get('save') or {})
355
                    logger.info(
356
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
357
                    )
358
                    continue
359
                elif not self.task_verify(task):
360
                    continue
361
                self.on_task_status(task)
362
                cnt += 1
363
        except Queue.Empty:
364
            pass
365
        return cnt
366
367
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
368
369
    def _check_request(self):
370
        '''Check new task queue'''
371
        # check _postpone_request first
372
        todo = []
373
        for task in self._postpone_request:
374
            if task['project'] not in self.projects:
375
                continue
376
            if self.projects[task['project']].task_queue.is_processing(task['taskid']):
377
                todo.append(task)
378
            else:
379
                self.on_request(task)
380
        self._postpone_request = todo
381
382
        tasks = {}
383
        while len(tasks) < self.LOOP_LIMIT:
384
            try:
385
                task = self.newtask_queue.get_nowait()
386
            except Queue.Empty:
387
                break
388
389
            if isinstance(task, list):
390
                _tasks = task
391
            else:
392
                _tasks = (task, )
393
394
            for task in _tasks:
395
                if not self.task_verify(task):
396
                    continue
397
398
                if task['taskid'] in self.projects[task['project']].task_queue:
399
                    if not task.get('schedule', {}).get('force_update', False):
400
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
401
                        continue
402
403
                if task['taskid'] in tasks:
404
                    if not task.get('schedule', {}).get('force_update', False):
405
                        continue
406
407
                tasks[task['taskid']] = task
408
409
        for task in itervalues(tasks):
410
            self.on_request(task)
411
412
        return len(tasks)
413
414
    def _check_cronjob(self):
415
        """Check projects cronjob tick, return True when a new tick is sended"""
416
        now = time.time()
417
        self._last_tick = int(self._last_tick)
418
        if now - self._last_tick < 1:
419
            return False
420
        self._last_tick += 1
421
        for project in itervalues(self.projects):
422
            if not project.active:
423
                continue
424
            if project.waiting_get_info:
425
                continue
426
            if project.min_tick == 0:
427
                continue
428
            if self._last_tick % int(project.min_tick) != 0:
429
                continue
430
            self.on_select_task({
431
                'taskid': '_on_cronjob',
432
                'project': project.name,
433
                'url': 'data:,_on_cronjob',
434
                'status': self.taskdb.SUCCESS,
435
                'fetch': {
436
                    'save': {
437
                        'tick': self._last_tick,
438
                    },
439
                },
440
                'process': {
441
                    'callback': '_on_cronjob',
442
                },
443
            })
444
        return True
445
446
    request_task_fields = [
447
        'taskid',
448
        'project',
449
        'url',
450
        'status',
451
        'schedule',
452
        'fetch',
453
        'process',
454
        'track',
455
        'lastcrawltime'
456
    ]
457
458
    def _check_select(self):
459
        '''Select task to fetch & process'''
460
        while self._send_buffer:
461
            _task = self._send_buffer.pop()
462
            try:
463
                # use force=False here to prevent automatic send_buffer append and get exception
464
                self.send_task(_task, False)
465
            except Queue.Full:
466
                self._send_buffer.append(_task)
467
                break
468
469
        if self.out_queue.full():
470
            return {}
471
472
        taskids = []
473
        cnt = 0
474
        cnt_dict = dict()
475
        limit = self.LOOP_LIMIT
476
        for project in itervalues(self.projects):
477
            if not project.active:
478
                continue
479
            # only check project pause when select new tasks, cronjob and new request still working
480
            if project.paused:
481
                continue
482
            if project.waiting_get_info:
483
                continue
484
            if cnt >= limit:
485
                break
486
487
            # task queue
488
            task_queue = project.task_queue
489
            task_queue.check_update()
490
            project_cnt = 0
491
492
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
493
            while cnt < limit and project_cnt < limit / 10:
494
                taskid = task_queue.get()
495
                if not taskid:
496
                    break
497
498
                taskids.append((project.name, taskid))
499
                project_cnt += 1
500
                cnt += 1
501
502
            cnt_dict[project.name] = project_cnt
503
            if project_cnt:
504
                project._send_finished_event = True
505
            # check and send finished event to project
506
            elif len(task_queue) == 0 and project._send_finished_event:
507
                project._send_finished_event = False
508
                self.on_select_task({
509
                    'taskid': 'on_finished',
510
                    'project': project.name,
511
                    'url': 'data:,on_finished',
512
                    'status': self.taskdb.SUCCESS,
513
                    'process': {
514
                        'callback': 'on_finished',
515
                    },
516
                })
517
518
        for project, taskid in taskids:
519
            self._load_put_task(project, taskid)
520
521
        return cnt_dict
522
523
    def _load_put_task(self, project, taskid):
524
        try:
525
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
526
        except ValueError:
527
            logger.error('bad task pack %s:%s', project, taskid)
528
            return
529
        if not task:
530
            return
531
        task = self.on_select_task(task)
532
533
    def _print_counter_log(self):
534
        # print top 5 active counters
535
        keywords = ('pending', 'success', 'retry', 'failed')
536
        total_cnt = {}
537
        project_actives = []
538
        project_fails = []
539
        for key in keywords:
540
            total_cnt[key] = 0
541
        for project, subcounter in iteritems(self._cnt['5m']):
542
            actives = 0
543
            for key in keywords:
544
                cnt = subcounter.get(key, None)
545
                if cnt:
546
                    cnt = cnt.sum
547
                    total_cnt[key] += cnt
548
                    actives += cnt
549
550
            project_actives.append((actives, project))
551
552
            fails = subcounter.get('failed', None)
553
            if fails:
554
                project_fails.append((fails.sum, project))
555
556
        top_2_fails = sorted(project_fails, reverse=True)[:2]
557
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
558
                               reverse=True)[:5 - len(top_2_fails)]
559
560
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
561
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
562
        for _, project in itertools.chain(top_3_actives, top_2_fails):
563
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
564
            log_str += " %s:%d,%d,%d,%d" % (project,
565
                                            subcounter.get('pending', 0),
566
                                            subcounter.get('success', 0),
567
                                            subcounter.get('retry', 0),
568
                                            subcounter.get('failed', 0))
569
        logger.info(log_str)
570
571
    def _dump_cnt(self):
572
        '''Dump counters to file'''
573
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
574
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
575
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
576
577
    def _try_dump_cnt(self):
578
        '''Dump counters every 60 seconds'''
579
        now = time.time()
580
        if now - self._last_dump_cnt > 60:
581
            self._last_dump_cnt = now
582
            self._dump_cnt()
583
            self._print_counter_log()
584
585
    def _check_delete(self):
586
        '''Check project delete'''
587
        now = time.time()
588
        for project in list(itervalues(self.projects)):
589
            if project.db_status != 'STOP':
590
                continue
591
            if now - project.updatetime < self.DELETE_TIME:
592
                continue
593
            if 'delete' not in self.projectdb.split_group(project.group):
594
                continue
595
596
            logger.warning("deleting project: %s!", project.name)
597
            del self.projects[project.name]
598
            self.taskdb.drop(project.name)
599
            self.projectdb.drop(project.name)
600
            if self.resultdb:
601
                self.resultdb.drop(project.name)
602
            for each in self._cnt.values():
603
                del each[project.name]
604
605
    def __len__(self):
606
        return sum(len(x.task_queue) for x in itervalues(self.projects))
607
608
    def quit(self):
609
        '''Set quit signal'''
610
        self._quit = True
611
        # stop xmlrpc server
612
        if hasattr(self, 'xmlrpc_server'):
613
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
614
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
615
616
    def run_once(self):
617
        '''comsume queues and feed tasks to fetcher, once'''
618
619
        self._update_projects()
620
        self._check_task_done()
621
        self._check_request()
622
        while self._check_cronjob():
623
            pass
624
        self._check_select()
625
        self._check_delete()
626
        self._try_dump_cnt()
627
628
    def run(self):
629
        '''Start scheduler loop'''
630
        logger.info("scheduler starting...")
631
632
        while not self._quit:
633
            try:
634
                time.sleep(self.LOOP_INTERVAL)
635
                self.run_once()
636
                self._exceptions = 0
637
            except KeyboardInterrupt:
638
                break
639
            except Exception as e:
640
                logger.exception(e)
641
                self._exceptions += 1
642
                if self._exceptions > self.EXCEPTION_LIMIT:
643
                    break
644
                continue
645
646
        logger.info("scheduler exiting...")
647
        self._dump_cnt()
648
649
    def trigger_on_start(self, project):
650
        '''trigger an on_start callback of project'''
651
        self.newtask_queue.put({
652
            "project": project,
653
            "taskid": "on_start",
654
            "url": "data:,on_start",
655
            "process": {
656
                "callback": "on_start",
657
            },
658
        })
659
660
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
661
        '''Start xmlrpc interface'''
662
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
663
664
        application = WSGIXMLRPCApplication()
665
666
        application.register_function(self.quit, '_quit')
667
        application.register_function(self.__len__, 'size')
668
669
        def dump_counter(_time, _type):
670
            try:
671
                return self._cnt[_time].to_dict(_type)
672
            except:
673
                logger.exception('')
674
        application.register_function(dump_counter, 'counter')
675
676
        def new_task(task):
677
            if self.task_verify(task):
678
                self.newtask_queue.put(task)
679
                return True
680
            return False
681
        application.register_function(new_task, 'newtask')
682
683
        def send_task(task):
684
            '''dispatch task to fetcher'''
685
            self.send_task(task)
686
            return True
687
        application.register_function(send_task, 'send_task')
688
689
        def update_project():
690
            self._force_update_project = True
691
        application.register_function(update_project, 'update_project')
692
693
        def get_active_tasks(project=None, limit=100):
694
            allowed_keys = set((
695
                'type',
696
                'taskid',
697
                'project',
698
                'status',
699
                'url',
700
                'lastcrawltime',
701
                'updatetime',
702
                'track',
703
            ))
704
            track_allowed_keys = set((
705
                'ok',
706
                'time',
707
                'follows',
708
                'status_code',
709
            ))
710
711
            iters = [iter(x.active_tasks) for k, x in iteritems(self.projects)
712
                     if x and (k == project if project else True)]
713
            tasks = [next(x, None) for x in iters]
714
            result = []
715
716
            while len(result) < limit and tasks and not all(x is None for x in tasks):
717
                updatetime, task = t = max(t for t in tasks if t)
718
                i = tasks.index(t)
719
                tasks[i] = next(iters[i], None)
720
                for key in list(task):
721
                    if key == 'track':
722
                        for k in list(task[key].get('fetch', [])):
723
                            if k not in track_allowed_keys:
724
                                del task[key]['fetch'][k]
725
                        for k in list(task[key].get('process', [])):
726
                            if k not in track_allowed_keys:
727
                                del task[key]['process'][k]
728
                    if key in allowed_keys:
729
                        continue
730
                    del task[key]
731
                result.append(t)
732
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
733
            # have no idea why
734
            return json.loads(json.dumps(result))
735
        application.register_function(get_active_tasks, 'get_active_tasks')
736
737
        def get_projects_pause_status():
738
            result = {}
739
            for project_name, project in iteritems(self.projects):
740
                result[project_name] = project.paused
741
            return result
742
        application.register_function(get_projects_pause_status, 'get_projects_pause_status')
743
744
        def webui_update():
745
            return {
746
                'pause_status': get_projects_pause_status(),
747
                'counter': {
748
                    '5m_time': dump_counter('5m_time', 'avg'),
749
                    '5m': dump_counter('5m', 'sum'),
750
                    '1h': dump_counter('1h', 'sum'),
751
                    '1d': dump_counter('1d', 'sum'),
752
                    'all': dump_counter('all', 'sum'),
753
                },
754
            }
755
        application.register_function(webui_update, 'webui_update')
756
757
        import tornado.wsgi
758
        import tornado.ioloop
759
        import tornado.httpserver
760
761
        container = tornado.wsgi.WSGIContainer(application)
762
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
763
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
764
        self.xmlrpc_server.listen(port=port, address=bind)
765
        logger.info('scheduler.xmlrpc listening on %s:%s', bind, port)
766
        self.xmlrpc_ioloop.start()
767
768
    def on_request(self, task):
769
        if self.INQUEUE_LIMIT and len(self.projects[task['project']].task_queue) >= self.INQUEUE_LIMIT:
770
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
771
            return
772
773
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
774
                                       fields=self.merge_task_fields)
775
        if oldtask:
776
            return self.on_old_request(task, oldtask)
777
        else:
778
            return self.on_new_request(task)
779
780
    def on_new_request(self, task):
781
        '''Called when a new request is arrived'''
782
        task['status'] = self.taskdb.ACTIVE
783
        self.insert_task(task)
784
        self.put_task(task)
785
786
        project = task['project']
787
        self._cnt['5m'].event((project, 'pending'), +1)
788
        self._cnt['1h'].event((project, 'pending'), +1)
789
        self._cnt['1d'].event((project, 'pending'), +1)
790
        self._cnt['all'].event((project, 'pending'), +1)
791
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
792
        return task
793
794
    def on_old_request(self, task, old_task):
795
        '''Called when a crawled task is arrived'''
796
        now = time.time()
797
798
        _schedule = task.get('schedule', self.default_schedule)
799
        old_schedule = old_task.get('schedule', {})
800
801
        if _schedule.get('force_update') and self.projects[task['project']].task_queue.is_processing(task['taskid']):
802
            # when a task is in processing, the modify may conflict with the running task.
803
            # postpone the modify after task finished.
804
            logger.info('postpone modify task %(project)s:%(taskid)s %(url)s', task)
805
            self._postpone_request.append(task)
806
            return
807
808
        restart = False
809
        schedule_age = _schedule.get('age', self.default_schedule['age'])
810
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
811
            restart = True
812
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
813
            restart = True
814
        elif _schedule.get('force_update'):
815
            restart = True
816
817
        if not restart:
818
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
819
            return
820
821
        if _schedule.get('cancel'):
822
            logger.info('cancel task %(project)s:%(taskid)s %(url)s', task)
823
            task['status'] = self.taskdb.BAD
824
            self.update_task(task)
825
            self.projects[task['project']].task_queue.delete(task['taskid'])
826
            return task
827
828
        task['status'] = self.taskdb.ACTIVE
829
        self.update_task(task)
830
        self.put_task(task)
831
832
        project = task['project']
833
        if old_task['status'] != self.taskdb.ACTIVE:
834
            self._cnt['5m'].event((project, 'pending'), +1)
835
            self._cnt['1h'].event((project, 'pending'), +1)
836
            self._cnt['1d'].event((project, 'pending'), +1)
837
        if old_task['status'] == self.taskdb.SUCCESS:
838
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
839
        elif old_task['status'] == self.taskdb.FAILED:
840
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
841
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
842
        return task
843
844
    def on_task_status(self, task):
845
        '''Called when a status pack is arrived'''
846
        try:
847
            procesok = task['track']['process']['ok']
848
            if not self.projects[task['project']].task_queue.done(task['taskid']):
849
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
850
                return None
851
        except KeyError as e:
852
            logger.error("Bad status pack: %s", e)
853
            return None
854
855
        if procesok:
856
            ret = self.on_task_done(task)
857
        else:
858
            ret = self.on_task_failed(task)
859
860
        if task['track']['fetch'].get('time'):
861
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
862
                                       task['track']['fetch']['time'])
863
        if task['track']['process'].get('time'):
864
            self._cnt['5m_time'].event((task['project'], 'process_time'),
865
                                       task['track']['process'].get('time'))
866
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
867
        return ret
868
869
    def on_task_done(self, task):
870
        '''Called when a task is done and success, called by `on_task_status`'''
871
        task['status'] = self.taskdb.SUCCESS
872
        task['lastcrawltime'] = time.time()
873
874
        if 'schedule' in task:
875
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
876
                task['status'] = self.taskdb.ACTIVE
877
                next_exetime = task['schedule'].get('age')
878
                task['schedule']['exetime'] = time.time() + next_exetime
879
                self.put_task(task)
880
            else:
881
                del task['schedule']
882
        self.update_task(task)
883
884
        project = task['project']
885
        self._cnt['5m'].event((project, 'success'), +1)
886
        self._cnt['1h'].event((project, 'success'), +1)
887
        self._cnt['1d'].event((project, 'success'), +1)
888
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
889
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
890
        return task
891
892
    def on_task_failed(self, task):
893
        '''Called when a task is failed, called by `on_task_status`'''
894
895
        if 'schedule' not in task:
896
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
897
            if old_task is None:
898
                logging.error('unknown status pack: %s' % task)
899
                return
900
            task['schedule'] = old_task.get('schedule', {})
901
902
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
903
        retried = task['schedule'].get('retried', 0)
904
905
        project_info = self.projects[task['project']]
906
        retry_delay = project_info.retry_delay or self.DEFAULT_RETRY_DELAY
907
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
908
909
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
910
            next_exetime = min(next_exetime, task['schedule'].get('age'))
911
        else:
912
            if retried >= retries:
913
                next_exetime = -1
914
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
915
                next_exetime = task['schedule'].get('age')
916
917
        if next_exetime < 0:
918
            task['status'] = self.taskdb.FAILED
919
            task['lastcrawltime'] = time.time()
920
            self.update_task(task)
921
922
            project = task['project']
923
            self._cnt['5m'].event((project, 'failed'), +1)
924
            self._cnt['1h'].event((project, 'failed'), +1)
925
            self._cnt['1d'].event((project, 'failed'), +1)
926
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
927
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
928
            return task
929
        else:
930
            task['schedule']['retried'] = retried + 1
931
            task['schedule']['exetime'] = time.time() + next_exetime
932
            task['lastcrawltime'] = time.time()
933
            self.update_task(task)
934
            self.put_task(task)
935
936
            project = task['project']
937
            self._cnt['5m'].event((project, 'retry'), +1)
938
            self._cnt['1h'].event((project, 'retry'), +1)
939
            self._cnt['1d'].event((project, 'retry'), +1)
940
            # self._cnt['all'].event((project, 'retry'), +1)
941
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
942
                retried, retries), task)
943
            return task
944
945
    def on_select_task(self, task):
946
        '''Called when a task is selected to fetch & process'''
947
        # inject informations about project
948
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
949
950
        project_info = self.projects.get(task['project'])
951
        assert project_info, 'no such project'
952
        task['type'] = self.TASK_PACK
953
        task['group'] = project_info.group
954
        task['project_md5sum'] = project_info.md5sum
955
        task['project_updatetime'] = project_info.updatetime
956
957
        # lazy join project.crawl_config
958
        if getattr(project_info, 'crawl_config', None):
959
            task = BaseHandler.task_join_crawl_config(task, project_info.crawl_config)
960
961
        project_info.active_tasks.appendleft((time.time(), task))
962
        self.send_task(task)
963
        return task
964
965
966
from tornado import gen
967
968
969
class OneScheduler(Scheduler):
970
    """
971
    Scheduler Mixin class for one mode
972
973
    overwirted send_task method
974
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
975
    """
976
977
    def _check_select(self):
978
        """
979
        interactive mode of select tasks
980
        """
981
        if not self.interactive:
982
            return super(OneScheduler, self)._check_select()
983
984
        # waiting for running tasks
985
        if self.running_task > 0:
986
            return
987
988
        is_crawled = []
989
990
        def run(project=None):
991
            return crawl('on_start', project=project)
992
993
        def crawl(url, project=None, **kwargs):
994
            """
995
            Crawl given url, same parameters as BaseHandler.crawl
996
997
            url - url or taskid, parameters will be used if in taskdb
998
            project - can be ignored if only one project exists.
999
            """
1000
1001
            # looking up the project instance
1002
            if project is None:
1003
                if len(self.projects) == 1:
1004
                    project = list(self.projects.keys())[0]
1005
                else:
1006
                    raise LookupError('You need specify the project: %r'
1007
                                      % list(self.projects.keys()))
1008
            project_data = self.processor.project_manager.get(project)
1009
            if not project_data:
1010
                raise LookupError('no such project: %s' % project)
1011
1012
            # get task package
1013
            instance = project_data['instance']
1014
            instance._reset()
1015
            task = instance.crawl(url, **kwargs)
1016
            if isinstance(task, list):
1017
                raise Exception('url list is not allowed in interactive mode')
1018
1019
            # check task in taskdb
1020
            if not kwargs:
1021
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
1022
                                              fields=self.request_task_fields)
1023
                if not dbtask:
1024
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
1025
                                                  fields=self.request_task_fields)
1026
                if dbtask:
1027
                    task = dbtask
1028
1029
            # select the task
1030
            self.on_select_task(task)
1031
            is_crawled.append(True)
1032
1033
            shell.ask_exit()
1034
1035
        def quit_interactive():
1036
            '''Quit interactive mode'''
1037
            is_crawled.append(True)
1038
            self.interactive = False
1039
            shell.ask_exit()
1040
1041
        def quit_pyspider():
1042
            '''Close pyspider'''
1043
            is_crawled[:] = []
1044
            shell.ask_exit()
1045
1046
        shell = utils.get_python_console()
1047
        banner = (
1048
            'pyspider shell - Select task\n'
1049
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
1050
            'quit_interactive() - Quit interactive mode\n'
1051
            'quit_pyspider() - Close pyspider'
1052
        )
1053
        if hasattr(shell, 'show_banner'):
1054
            shell.show_banner(banner)
1055
            shell.interact()
1056
        else:
1057
            shell.interact(banner)
1058
        if not is_crawled:
1059
            self.ioloop.add_callback(self.ioloop.stop)
1060
1061
    def __getattr__(self, name):
1062
        """patch for crawl(url, callback=self.index_page) API"""
1063
        if self.interactive:
1064
            return name
1065
        raise AttributeError(name)
1066
1067
    def on_task_status(self, task):
1068
        """Ignore not processing error in interactive mode"""
1069
        if not self.interactive:
1070
            super(OneScheduler, self).on_task_status(task)
1071
1072
        try:
1073
            procesok = task['track']['process']['ok']
1074
        except KeyError as e:
1075
            logger.error("Bad status pack: %s", e)
1076
            return None
1077
1078
        if procesok:
1079
            ret = self.on_task_done(task)
1080
        else:
1081
            ret = self.on_task_failed(task)
1082
        if task['track']['fetch'].get('time'):
1083
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
1084
                                       task['track']['fetch']['time'])
1085
        if task['track']['process'].get('time'):
1086
            self._cnt['5m_time'].event((task['project'], 'process_time'),
1087
                                       task['track']['process'].get('time'))
1088
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
1089
        return ret
1090
1091
    def init_one(self, ioloop, fetcher, processor,
1092
                 result_worker=None, interactive=False):
1093
        self.ioloop = ioloop
1094
        self.fetcher = fetcher
1095
        self.processor = processor
1096
        self.result_worker = result_worker
1097
        self.interactive = interactive
1098
        self.running_task = 0
1099
1100
    @gen.coroutine
1101
    def do_task(self, task):
1102
        self.running_task += 1
1103
        result = yield gen.Task(self.fetcher.fetch, task)
1104
        type, task, response = result.args
1105
        self.processor.on_task(task, response)
1106
        # do with message
1107
        while not self.processor.inqueue.empty():
1108
            _task, _response = self.processor.inqueue.get()
1109
            self.processor.on_task(_task, _response)
1110
        # do with results
1111
        while not self.processor.result_queue.empty():
1112
            _task, _result = self.processor.result_queue.get()
1113
            if self.result_worker:
1114
                self.result_worker.on_result(_task, _result)
1115
        self.running_task -= 1
1116
1117
    def send_task(self, task, force=True):
1118
        if self.fetcher.http_client.free_size() <= 0:
1119
            if force:
1120
                self._send_buffer.appendleft(task)
1121
            else:
1122
                raise self.outqueue.Full
1123
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
1124
1125
    def run(self):
1126
        import tornado.ioloop
1127
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
1128
                                        io_loop=self.ioloop).start()
1129
        self.ioloop.start()
1130
1131
    def quit(self):
1132
        self.ioloop.stop()
1133
        logger.info("scheduler exiting...")
1134
1135
1136
import random
1137
import threading
1138
1139
1140
class ThreadBaseScheduler(Scheduler):
1141
    def __init__(self, threads=4, *args, **kwargs):
1142
        self.threads = threads
1143
        self.local = threading.local()
1144
1145
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
1146
1147
        self._taskdb = self.taskdb
1148
        self._projectdb = self.projectdb
1149
        self._resultdb = self.resultdb
1150
1151
        self.thread_objs = []
1152
        self.thread_queues = []
1153
        self._start_threads()
1154
        assert len(self.thread_queues) > 0
1155
1156
    @property
1157
    def taskdb(self):
1158
        if not hasattr(self.local, 'taskdb'):
1159
            self.taskdb = self._taskdb.copy()
1160
        return self.local.taskdb
1161
1162
    @taskdb.setter
1163
    def taskdb(self, taskdb):
1164
        self.local.taskdb = taskdb
1165
1166
    @property
1167
    def projectdb(self):
1168
        if not hasattr(self.local, 'projectdb'):
1169
            self.projectdb = self._projectdb.copy()
1170
        return self.local.projectdb
1171
1172
    @projectdb.setter
1173
    def projectdb(self, projectdb):
1174
        self.local.projectdb = projectdb
1175
1176
    @property
1177
    def resultdb(self):
1178
        if not hasattr(self.local, 'resultdb'):
1179
            self.resultdb = self._resultdb.copy()
1180
        return self.local.resultdb
1181
1182
    @resultdb.setter
1183
    def resultdb(self, resultdb):
1184
        self.local.resultdb = resultdb
1185
1186
    def _start_threads(self):
1187
        for i in range(self.threads):
1188
            queue = Queue.Queue()
1189
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
1190
            thread.daemon = True
1191
            thread.start()
1192
            self.thread_objs.append(thread)
1193
            self.thread_queues.append(queue)
1194
1195
    def _thread_worker(self, queue):
1196
        while True:
1197
            method, args, kwargs = queue.get()
1198
            try:
1199
                method(*args, **kwargs)
1200
            except Exception as e:
1201
                logger.exception(e)
1202
1203
    def _run_in_thread(self, method, *args, **kwargs):
1204
        i = kwargs.pop('_i', None)
1205
        block = kwargs.pop('_block', False)
1206
1207
        if i is None:
1208
            while True:
1209
                for queue in self.thread_queues:
1210
                    if queue.empty():
1211
                        break
1212
                else:
1213
                    if block:
1214
                        time.sleep(0.1)
1215
                        continue
1216
                    else:
1217
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1218
                break
1219
        else:
1220
            queue = self.thread_queues[i % len(self.thread_queues)]
1221
1222
        queue.put((method, args, kwargs))
1223
1224
        if block:
1225
            self._wait_thread()
1226
1227
    def _wait_thread(self):
1228
        while True:
1229
            if all(queue.empty() for queue in self.thread_queues):
1230
                break
1231
            time.sleep(0.1)
1232
1233
    def _update_project(self, project):
1234
        self._run_in_thread(Scheduler._update_project, self, project)
1235
1236
    def on_task_status(self, task):
1237
        i = hash(task['taskid'])
1238
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1239
1240
    def on_request(self, task):
1241
        i = hash(task['taskid'])
1242
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1243
1244
    def _load_put_task(self, project, taskid):
1245
        i = hash(taskid)
1246
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1247
1248
    def run_once(self):
1249
        super(ThreadBaseScheduler, self).run_once()
1250
        self._wait_thread()
1251