Completed
Push — master ( d27a46...273c66 )
by Roy
01:17
created

Scheduler._check_delete()   B

Complexity

Conditions 7

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 19
rs 7.3333
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import itertools
10
import json
11
import logging
12
import os
13
import time
14
from collections import deque
15
16
from six import iteritems, itervalues
17
from six.moves import queue as Queue
18
19
from pyspider.libs import counter, utils
20
from .task_queue import TaskQueue
21
22
logger = logging.getLogger('scheduler')
23
24
25
class Project(object):
26
    '''
27
    project for scheduler
28
    '''
29
    def __init__(self, project_info, ACTIVE_TASKS=100):
30
        '''
31
        '''
32
        self.paused = False
33
34
        self.active_tasks = deque(maxlen=ACTIVE_TASKS)
35
        self.task_queue = None
36
        self._send_finished_event = False
37
38
        self.md5sum = None
39
        self._send_on_get_info = False
40
        self.waiting_get_info = True
41
42
        self.update(project_info)
43
44
    def update(self, project_info):
45
        self.project_info = project_info
46
47
        self.name = project_info['name']
48
        self.group = project_info['group']
49
        self.db_status = project_info['status']
50
        self.updatetime = project_info['updatetime']
51
52
        md5sum = utils.md5string(project_info['script'])
53
        if (self.md5sum != md5sum or self.waiting_get_info) and self.active:
54
            self._send_on_get_info = True
55
            self.waiting_get_info = True
56
        self.md5sum = md5sum
57
58
        if self.task_queue:
59
            if self.active:
60
                self.task_queue.rate = project_info['rate']
61
                self.task_queue.burst = project_info['burst']
62
            else:
63
                self.task_queue.rate = 0
64
                self.task_queue.burst = 0
65
66
    def on_get_info(self, info):
67
        self.waiting_get_info = False
68
        self.min_tick = info.get('min_tick', 0)
69
        self.retry_delay = info.get('retry_delay', {})
70
71
    @property
72
    def active(self):
73
        return self.db_status in ('RUNNING', 'DEBUG') and not self.paused
74
75
76
class Scheduler(object):
77
    UPDATE_PROJECT_INTERVAL = 5 * 60
78
    default_schedule = {
79
        'priority': 0,
80
        'retries': 3,
81
        'exetime': 0,
82
        'age': -1,
83
        'itag': None,
84
    }
85
    LOOP_LIMIT = 1000
86
    LOOP_INTERVAL = 0.1
87
    ACTIVE_TASKS = 100
88
    INQUEUE_LIMIT = 0
89
    EXCEPTION_LIMIT = 3
90
    DELETE_TIME = 24 * 60 * 60
91
    DEFAULT_RETRY_DELAY = {
92
        0: 30,
93
        1: 1*60*60,
94
        2: 6*60*60,
95
        3: 12*60*60,
96
        '': 24*60*60
97
    }
98
99
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
100
                 out_queue, data_path='./data', resultdb=None):
101
        self.taskdb = taskdb
102
        self.projectdb = projectdb
103
        self.resultdb = resultdb
104
        self.newtask_queue = newtask_queue
105
        self.status_queue = status_queue
106
        self.out_queue = out_queue
107
        self.data_path = data_path
108
109
        self._send_buffer = deque()
110
        self._quit = False
111
        self._exceptions = 0
112
        self.projects = dict()
113
        self._force_update_project = False
114
        self._last_update_project = 0
115
        self._last_tick = int(time.time())
116
        self._postpone_request = []
117
118
        self._cnt = {
119
            "5m_time": counter.CounterManager(
120
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
121
            "5m": counter.CounterManager(
122
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
123
            "1h": counter.CounterManager(
124
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
125
            "1d": counter.CounterManager(
126
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
127
            "all": counter.CounterManager(
128
                lambda: counter.TotalCounter()),
129
        }
130
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
131
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
132
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
133
        self._last_dump_cnt = 0
134
135
    def _update_projects(self):
136
        '''Check project update'''
137
        now = time.time()
138
        if (
139
                not self._force_update_project
140
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
141
        ):
142
            return
143
        for project in self.projectdb.check_update(self._last_update_project):
144
            self._update_project(project)
145
            logger.debug("project: %s updated.", project['name'])
146
        self._force_update_project = False
147
        self._last_update_project = now
148
149
    def _update_project(self, project):
150
        '''update one project'''
151
        if project['name'] not in self.projects:
152
            self.projects[project['name']] = Project(project, ACTIVE_TASKS=self.ACTIVE_TASKS)
153
        else:
154
            self.projects[project['name']].update(project)
155
156
        project = self.projects[project['name']]
157
158
        if project._send_on_get_info:
159
            # update project runtime info from processor by sending a _on_get_info
160
            # request, result is in status_page.track.save
161
            project._send_on_get_info = False
162
            self.on_select_task({
163
                'taskid': '_on_get_info',
164
                'project': project.name,
165
                'url': 'data:,_on_get_info',
166
                'status': self.taskdb.SUCCESS,
167
                'fetch': {
168
                    'save': ['min_tick', 'retry_delay'],
169
                },
170
                'process': {
171
                    'callback': '_on_get_info',
172
                },
173
            })
174
175
        # load task queue when project is running and delete task_queue when project is stoped
176
        if project.active and not project.task_queue:
177
            self._load_tasks(project)
178
            project.task_queue.rate = project.project_info['rate']
179
            project.task_queue.burst = project.project_info['burst']
180
181
        if not project.active:
182
            project.task_queue = None
183
184
            if project not in self._cnt['all']:
185
                self._update_project_cnt(project.name)
186
187
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
188
189
    def _load_tasks(self, project):
190
        '''load tasks from database'''
191
        task_queue = project.task_queue = TaskQueue()
192
193
        for task in self.taskdb.load_tasks(
194
                self.taskdb.ACTIVE, project.name, self.scheduler_task_fields
195
        ):
196
            taskid = task['taskid']
197
            _schedule = task.get('schedule', self.default_schedule)
198
            priority = _schedule.get('priority', self.default_schedule['priority'])
199
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
200
            task_queue.put(taskid, priority, exetime)
201
        project.task_loaded = True
202
        logger.debug('project: %s loaded %d tasks.', project.name, len(task_queue))
203
204
        if project not in self._cnt['all']:
205
            self._update_project_cnt(project)
206
        self._cnt['all'].value((project, 'pending'), len(project.task_queue))
207
208
    def _update_project_cnt(self, project):
209
        status_count = self.taskdb.status_count(project)
210
        self._cnt['all'].value(
211
            (project, 'success'),
212
            status_count.get(self.taskdb.SUCCESS, 0)
213
        )
214
        self._cnt['all'].value(
215
            (project, 'failed'),
216
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
217
        )
218
        self._cnt['all'].value(
219
            (project, 'pending'),
220
            status_count.get(self.taskdb.ACTIVE, 0)
221
        )
222
223
    def task_verify(self, task):
224
        '''
225
        return False if any of 'taskid', 'project', 'url' is not in task dict
226
                        or project in not in task_queue
227
        '''
228
        for each in ('taskid', 'project', 'url', ):
229
            if each not in task or not task[each]:
230
                logger.error('%s not in task: %.200r', each, task)
231
                return False
232
        if task['project'] not in self.projects:
233
            logger.error('unknown project: %s', task['project'])
234
            return False
235
236
        project = self.projects[task['project']]
237
        if not project.active:
238
            if project.paused:
239
                logger.error('project %s paused', task['project'])
240
            else:
241
                logger.error('project %s not started, please set status to RUNNING or DEBUG',
242
                             task['project'])
243
            return False
244
        return True
245
246
    def insert_task(self, task):
247
        '''insert task into database'''
248
        return self.taskdb.insert(task['project'], task['taskid'], task)
249
250
    def update_task(self, task):
251
        '''update task in database'''
252
        return self.taskdb.update(task['project'], task['taskid'], task)
253
254
    def put_task(self, task):
255
        '''put task to task queue'''
256
        _schedule = task.get('schedule', self.default_schedule)
257
        self.projects[task['project']].task_queue.put(
258
            task['taskid'],
259
            priority=_schedule.get('priority', self.default_schedule['priority']),
260
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
261
        )
262
263
    def send_task(self, task, force=True):
264
        '''
265
        dispatch task to fetcher
266
267
        out queue may have size limit to prevent block, a send_buffer is used
268
        '''
269
        try:
270
            self.out_queue.put_nowait(task)
271
        except Queue.Full:
272
            if force:
273
                self._send_buffer.appendleft(task)
274
            else:
275
                raise
276
277
    def _check_task_done(self):
278
        '''Check status queue'''
279
        cnt = 0
280
        try:
281
            while True:
282
                task = self.status_queue.get_nowait()
283
                # check _on_get_info result here
284
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
285
                    if task['project'] not in self.projects:
286
                        continue
287
                    project = self.projects[task['project']]
288
                    project.on_get_info(task['track'].get('save') or {})
289
                    logger.info(
290
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
291
                    )
292
                    continue
293
                elif not self.task_verify(task):
294
                    continue
295
                self.on_task_status(task)
296
                cnt += 1
297
        except Queue.Empty:
298
            pass
299
        return cnt
300
301
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
302
303
    def _check_request(self):
304
        '''Check new task queue'''
305
        # check _postpone_request first
306
        todo = []
307
        for task in self._postpone_request:
308
            if task['project'] not in self.projects:
309
                continue
310
            if self.projects[task['project']].task_queue.is_processing(task['taskid']):
311
                todo.append(task)
312
            else:
313
                self.on_request(task)
314
        self._postpone_request = todo
315
316
        tasks = {}
317
        while len(tasks) < self.LOOP_LIMIT:
318
            try:
319
                task = self.newtask_queue.get_nowait()
320
            except Queue.Empty:
321
                break
322
323
            if isinstance(task, list):
324
                _tasks = task
325
            else:
326
                _tasks = (task, )
327
328
            for task in _tasks:
329
                if not self.task_verify(task):
330
                    continue
331
332
                if task['taskid'] in self.projects[task['project']].task_queue:
333
                    if not task.get('schedule', {}).get('force_update', False):
334
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
335
                        continue
336
337
                if task['taskid'] in tasks:
338
                    if not task.get('schedule', {}).get('force_update', False):
339
                        continue
340
341
                tasks[task['taskid']] = task
342
343
        for task in itervalues(tasks):
344
            self.on_request(task)
345
346
        return len(tasks)
347
348
    def _check_cronjob(self):
349
        """Check projects cronjob tick, return True when a new tick is sended"""
350
        now = time.time()
351
        self._last_tick = int(self._last_tick)
352
        if now - self._last_tick < 1:
353
            return False
354
        self._last_tick += 1
355
        for project in itervalues(self.projects):
356
            if not project.active:
357
                continue
358
            if project.waiting_get_info:
359
                continue
360
            if project.min_tick == 0:
361
                continue
362
            if self._last_tick % int(project.min_tick) != 0:
363
                continue
364
            self.on_select_task({
365
                'taskid': '_on_cronjob',
366
                'project': project.name,
367
                'url': 'data:,_on_cronjob',
368
                'status': self.taskdb.SUCCESS,
369
                'fetch': {
370
                    'save': {
371
                        'tick': self._last_tick,
372
                    },
373
                },
374
                'process': {
375
                    'callback': '_on_cronjob',
376
                },
377
            })
378
        return True
379
380
    request_task_fields = [
381
        'taskid',
382
        'project',
383
        'url',
384
        'status',
385
        'schedule',
386
        'fetch',
387
        'process',
388
        'track',
389
        'lastcrawltime'
390
    ]
391
392
    def _check_select(self):
393
        '''Select task to fetch & process'''
394
        while self._send_buffer:
395
            _task = self._send_buffer.pop()
396
            try:
397
                # use force=False here to prevent automatic send_buffer append and get exception
398
                self.send_task(_task, False)
399
            except Queue.Full:
400
                self._send_buffer.append(_task)
401
                break
402
403
        if self.out_queue.full():
404
            return {}
405
406
        taskids = []
407
        cnt = 0
408
        cnt_dict = dict()
409
        limit = self.LOOP_LIMIT
410
        for project in itervalues(self.projects):
411
            if not project.active:
412
                continue
413
            if project.waiting_get_info:
414
                continue
415
            if cnt >= limit:
416
                break
417
418
            # task queue
419
            task_queue = project.task_queue
420
            task_queue.check_update()
421
            project_cnt = 0
422
423
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
424
            while cnt < limit and project_cnt < limit / 10:
425
                taskid = task_queue.get()
426
                if not taskid:
427
                    break
428
429
                taskids.append((project.name, taskid))
430
                project_cnt += 1
431
                cnt += 1
432
433
            cnt_dict[project.name] = project_cnt
434
            if project_cnt:
435
                project._send_finished_event = True
436
            # check and send finished event to project
437
            elif len(task_queue) == 0 and project._send_finished_event:
438
                project._send_finished_event = False
439
                self.on_select_task({
440
                    'taskid': 'on_finished',
441
                    'project': project.name,
442
                    'url': 'data:,on_finished',
443
                    'status': self.taskdb.SUCCESS,
444
                    'process': {
445
                        'callback': 'on_finished',
446
                    },
447
                })
448
449
        for project, taskid in taskids:
450
            self._load_put_task(project, taskid)
451
452
        return cnt_dict
453
454
    def _load_put_task(self, project, taskid):
455
        try:
456
            task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
457
        except ValueError:
458
            logger.error('bad task pack %s:%s', project, taskid)
459
            return
460
        if not task:
461
            return
462
        task = self.on_select_task(task)
463
464
    def _print_counter_log(self):
465
        # print top 5 active counters
466
        keywords = ('pending', 'success', 'retry', 'failed')
467
        total_cnt = {}
468
        project_actives = []
469
        project_fails = []
470
        for key in keywords:
471
            total_cnt[key] = 0
472
        for project, subcounter in iteritems(self._cnt['5m']):
473
            actives = 0
474
            for key in keywords:
475
                cnt = subcounter.get(key, None)
476
                if cnt:
477
                    cnt = cnt.sum
478
                    total_cnt[key] += cnt
479
                    actives += cnt
480
481
            project_actives.append((actives, project))
482
483
            fails = subcounter.get('failed', None)
484
            if fails:
485
                project_fails.append((fails.sum, project))
486
487
        top_2_fails = sorted(project_fails, reverse=True)[:2]
488
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
489
                               reverse=True)[:5 - len(top_2_fails)]
490
491
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
492
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
493
        for _, project in itertools.chain(top_3_actives, top_2_fails):
494
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
495
            log_str += " %s:%d,%d,%d,%d" % (project,
496
                                            subcounter.get('pending', 0),
497
                                            subcounter.get('success', 0),
498
                                            subcounter.get('retry', 0),
499
                                            subcounter.get('failed', 0))
500
        logger.info(log_str)
501
502
    def _dump_cnt(self):
503
        '''Dump counters to file'''
504
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
505
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
506
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
507
508
    def _try_dump_cnt(self):
509
        '''Dump counters every 60 seconds'''
510
        now = time.time()
511
        if now - self._last_dump_cnt > 60:
512
            self._last_dump_cnt = now
513
            self._dump_cnt()
514
            self._print_counter_log()
515
516
    def _check_delete(self):
517
        '''Check project delete'''
518
        now = time.time()
519
        for project in list(itervalues(self.projects)):
520
            if project.db_status != 'STOP':
521
                continue
522
            if now - project.updatetime < self.DELETE_TIME:
523
                continue
524
            if 'delete' not in self.projectdb.split_group(project.group):
525
                continue
526
527
            logger.warning("deleting project: %s!", project.name)
528
            del self.projects[project.name]
529
            self.taskdb.drop(project.name)
530
            self.projectdb.drop(project.name)
531
            if self.resultdb:
532
                self.resultdb.drop(project.name)
533
            for each in self._cnt.values():
534
                del each[project.name]
535
536
    def __len__(self):
537
        return sum(len(x.task_queue) for x in itervalues(self.projects))
538
539
    def quit(self):
540
        '''Set quit signal'''
541
        self._quit = True
542
        # stop xmlrpc server
543
        if hasattr(self, 'xmlrpc_server'):
544
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
545
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
546
547
    def run_once(self):
548
        '''comsume queues and feed tasks to fetcher, once'''
549
550
        self._update_projects()
551
        self._check_task_done()
552
        self._check_request()
553
        while self._check_cronjob():
554
            pass
555
        self._check_select()
556
        self._check_delete()
557
        self._try_dump_cnt()
558
559
    def run(self):
560
        '''Start scheduler loop'''
561
        logger.info("loading projects")
562
563
        while not self._quit:
564
            try:
565
                time.sleep(self.LOOP_INTERVAL)
566
                self.run_once()
567
                self._exceptions = 0
568
            except KeyboardInterrupt:
569
                break
570
            except Exception as e:
571
                logger.exception(e)
572
                self._exceptions += 1
573
                if self._exceptions > self.EXCEPTION_LIMIT:
574
                    break
575
                continue
576
577
        logger.info("scheduler exiting...")
578
        self._dump_cnt()
579
580
    def trigger_on_start(self, project):
581
        '''trigger an on_start callback of project'''
582
        self.newtask_queue.put({
583
            "project": project,
584
            "taskid": "on_start",
585
            "url": "data:,on_start",
586
            "process": {
587
                "callback": "on_start",
588
            },
589
        })
590
591
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
592
        '''Start xmlrpc interface'''
593
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
594
595
        application = WSGIXMLRPCApplication()
596
597
        application.register_function(self.quit, '_quit')
598
        application.register_function(self.__len__, 'size')
599
600
        def dump_counter(_time, _type):
601
            try:
602
                return self._cnt[_time].to_dict(_type)
603
            except:
604
                logger.exception('')
605
        application.register_function(dump_counter, 'counter')
606
607
        def new_task(task):
608
            if self.task_verify(task):
609
                self.newtask_queue.put(task)
610
                return True
611
            return False
612
        application.register_function(new_task, 'newtask')
613
614
        def send_task(task):
615
            '''dispatch task to fetcher'''
616
            self.send_task(task)
617
            return True
618
        application.register_function(send_task, 'send_task')
619
620
        def update_project():
621
            self._force_update_project = True
622
        application.register_function(update_project, 'update_project')
623
624
        def get_active_tasks(project=None, limit=100):
625
            allowed_keys = set((
626
                'taskid',
627
                'project',
628
                'status',
629
                'url',
630
                'lastcrawltime',
631
                'updatetime',
632
                'track',
633
            ))
634
            track_allowed_keys = set((
635
                'ok',
636
                'time',
637
                'follows',
638
                'status_code',
639
            ))
640
641
            iters = [iter(x.active_tasks) for k, x in iteritems(self.projects)
642
                     if x and (k == project if project else True)]
643
            tasks = [next(x, None) for x in iters]
644
            result = []
645
646
            while len(result) < limit and tasks and not all(x is None for x in tasks):
647
                updatetime, task = t = max(t for t in tasks if t)
648
                i = tasks.index(t)
649
                tasks[i] = next(iters[i], None)
650
                for key in list(task):
651
                    if key == 'track':
652
                        for k in list(task[key].get('fetch', [])):
653
                            if k not in track_allowed_keys:
654
                                del task[key]['fetch'][k]
655
                        for k in list(task[key].get('process', [])):
656
                            if k not in track_allowed_keys:
657
                                del task[key]['process'][k]
658
                    if key in allowed_keys:
659
                        continue
660
                    del task[key]
661
                result.append(t)
662
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
663
            # have no idea why
664
            return json.loads(json.dumps(result))
665
        application.register_function(get_active_tasks, 'get_active_tasks')
666
667
        import tornado.wsgi
668
        import tornado.ioloop
669
        import tornado.httpserver
670
671
        container = tornado.wsgi.WSGIContainer(application)
672
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
673
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
674
        self.xmlrpc_server.listen(port=port, address=bind)
675
        self.xmlrpc_ioloop.start()
676
677
    def on_request(self, task):
678
        if self.INQUEUE_LIMIT and len(self.projects[task['project']].task_queue) >= self.INQUEUE_LIMIT:
679
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
680
            return
681
682
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
683
                                       fields=self.merge_task_fields)
684
        if oldtask:
685
            return self.on_old_request(task, oldtask)
686
        else:
687
            return self.on_new_request(task)
688
689
    def on_new_request(self, task):
690
        '''Called when a new request is arrived'''
691
        task['status'] = self.taskdb.ACTIVE
692
        self.insert_task(task)
693
        self.put_task(task)
694
695
        project = task['project']
696
        self._cnt['5m'].event((project, 'pending'), +1)
697
        self._cnt['1h'].event((project, 'pending'), +1)
698
        self._cnt['1d'].event((project, 'pending'), +1)
699
        self._cnt['all'].event((project, 'pending'), +1)
700
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
701
        return task
702
703
    def on_old_request(self, task, old_task):
704
        '''Called when a crawled task is arrived'''
705
        now = time.time()
706
707
        _schedule = task.get('schedule', self.default_schedule)
708
        old_schedule = old_task.get('schedule', {})
709
710
        if _schedule.get('force_update') and self.projects[task['project']].task_queue.is_processing(task['taskid']):
711
            # when a task is in processing, the modify may conflict with the running task.
712
            # postpone the modify after task finished.
713
            logger.info('postpone modify task %(project)s:%(taskid)s %(url)s', task)
714
            self._postpone_request.append(task)
715
            return
716
717
        restart = False
718
        schedule_age = _schedule.get('age', self.default_schedule['age'])
719
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
720
            restart = True
721
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
722
            restart = True
723
        elif _schedule.get('force_update'):
724
            restart = True
725
726
        if not restart:
727
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
728
            return
729
730
        if _schedule.get('cancel'):
731
            logger.info('cancel task %(project)s:%(taskid)s %(url)s', task)
732
            task['status'] = self.taskdb.BAD
733
            self.update_task(task)
734
            self.projects[task['project']].task_queue.delete(task['taskid'])
735
            return task
736
737
        task['status'] = self.taskdb.ACTIVE
738
        self.update_task(task)
739
        self.put_task(task)
740
741
        project = task['project']
742
        if old_task['status'] != self.taskdb.ACTIVE:
743
            self._cnt['5m'].event((project, 'pending'), +1)
744
            self._cnt['1h'].event((project, 'pending'), +1)
745
            self._cnt['1d'].event((project, 'pending'), +1)
746
        if old_task['status'] == self.taskdb.SUCCESS:
747
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
748
        elif old_task['status'] == self.taskdb.FAILED:
749
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
750
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
751
        return task
752
753
    def on_task_status(self, task):
754
        '''Called when a status pack is arrived'''
755
        try:
756
            procesok = task['track']['process']['ok']
757
            if not self.projects[task['project']].task_queue.done(task['taskid']):
758
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
759
                return None
760
        except KeyError as e:
761
            logger.error("Bad status pack: %s", e)
762
            return None
763
764
        if procesok:
765
            ret = self.on_task_done(task)
766
        else:
767
            ret = self.on_task_failed(task)
768
769
        if task['track']['fetch'].get('time'):
770
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
771
                                       task['track']['fetch']['time'])
772
        if task['track']['process'].get('time'):
773
            self._cnt['5m_time'].event((task['project'], 'process_time'),
774
                                       task['track']['process'].get('time'))
775
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
776
        return ret
777
778
    def on_task_done(self, task):
779
        '''Called when a task is done and success, called by `on_task_status`'''
780
        task['status'] = self.taskdb.SUCCESS
781
        task['lastcrawltime'] = time.time()
782
783
        if 'schedule' in task:
784
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
785
                task['status'] = self.taskdb.ACTIVE
786
                next_exetime = task['schedule'].get('age')
787
                task['schedule']['exetime'] = time.time() + next_exetime
788
                self.put_task(task)
789
            else:
790
                del task['schedule']
791
        self.update_task(task)
792
793
        project = task['project']
794
        self._cnt['5m'].event((project, 'success'), +1)
795
        self._cnt['1h'].event((project, 'success'), +1)
796
        self._cnt['1d'].event((project, 'success'), +1)
797
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
798
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
799
        return task
800
801
    def on_task_failed(self, task):
802
        '''Called when a task is failed, called by `on_task_status`'''
803
804
        if 'schedule' not in task:
805
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
806
            if old_task is None:
807
                logging.error('unknown status pack: %s' % task)
808
                return
809
            task['schedule'] = old_task.get('schedule', {})
810
811
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
812
        retried = task['schedule'].get('retried', 0)
813
814
        project_info = self.projects[task['project']]
815
        retry_delay = project_info.retry_delay or self.DEFAULT_RETRY_DELAY
816
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
817
818
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
819
            next_exetime = min(next_exetime, task['schedule'].get('age'))
820
        else:
821
            if retried >= retries:
822
                next_exetime = -1
823
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
824
                next_exetime = task['schedule'].get('age')
825
826
        if next_exetime < 0:
827
            task['status'] = self.taskdb.FAILED
828
            task['lastcrawltime'] = time.time()
829
            self.update_task(task)
830
831
            project = task['project']
832
            self._cnt['5m'].event((project, 'failed'), +1)
833
            self._cnt['1h'].event((project, 'failed'), +1)
834
            self._cnt['1d'].event((project, 'failed'), +1)
835
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
836
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
837
            return task
838
        else:
839
            task['schedule']['retried'] = retried + 1
840
            task['schedule']['exetime'] = time.time() + next_exetime
841
            task['lastcrawltime'] = time.time()
842
            self.update_task(task)
843
            self.put_task(task)
844
845
            project = task['project']
846
            self._cnt['5m'].event((project, 'retry'), +1)
847
            self._cnt['1h'].event((project, 'retry'), +1)
848
            self._cnt['1d'].event((project, 'retry'), +1)
849
            # self._cnt['all'].event((project, 'retry'), +1)
850
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
851
                retried, retries), task)
852
            return task
853
854
    def on_select_task(self, task):
855
        '''Called when a task is selected to fetch & process'''
856
        # inject informations about project
857
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
858
859
        project_info = self.projects.get(task['project'])
860
        assert project_info, 'no such project'
861
        task['group'] = project_info.group
862
        task['project_md5sum'] = project_info.md5sum
863
        task['project_updatetime'] = project_info.updatetime
864
        project_info.active_tasks.appendleft((time.time(), task))
865
        self.send_task(task)
866
        return task
867
868
869
from tornado import gen
870
871
872
class OneScheduler(Scheduler):
873
    """
874
    Scheduler Mixin class for one mode
875
876
    overwirted send_task method
877
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
878
    """
879
880
    def _check_select(self):
881
        """
882
        interactive mode of select tasks
883
        """
884
        if not self.interactive:
885
            return super(OneScheduler, self)._check_select()
886
887
        # waiting for running tasks
888
        if self.running_task > 0:
889
            return
890
891
        is_crawled = []
892
893
        def run(project=None):
894
            return crawl('on_start', project=project)
895
896
        def crawl(url, project=None, **kwargs):
897
            """
898
            Crawl given url, same parameters as BaseHandler.crawl
899
900
            url - url or taskid, parameters will be used if in taskdb
901
            project - can be ignored if only one project exists.
902
            """
903
904
            # looking up the project instance
905
            if project is None:
906
                if len(self.projects) == 1:
907
                    project = list(self.projects.keys())[0]
908
                else:
909
                    raise LookupError('You need specify the project: %r'
910
                                      % list(self.projects.keys()))
911
            project_data = self.processor.project_manager.get(project)
912
            if not project_data:
913
                raise LookupError('no such project: %s' % project)
914
915
            # get task package
916
            instance = project_data['instance']
917
            instance._reset()
918
            task = instance.crawl(url, **kwargs)
919
            if isinstance(task, list):
920
                raise Exception('url list is not allowed in interactive mode')
921
922
            # check task in taskdb
923
            if not kwargs:
924
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
925
                                              fields=self.request_task_fields)
926
                if not dbtask:
927
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
928
                                                  fields=self.request_task_fields)
929
                if dbtask:
930
                    task = dbtask
931
932
            # select the task
933
            self.on_select_task(task)
934
            is_crawled.append(True)
935
936
            shell.ask_exit()
937
938
        def quit_interactive():
939
            '''Quit interactive mode'''
940
            is_crawled.append(True)
941
            self.interactive = False
942
            shell.ask_exit()
943
944
        def quit_pyspider():
945
            '''Close pyspider'''
946
            is_crawled[:] = []
947
            shell.ask_exit()
948
949
        shell = utils.get_python_console()
950
        shell.interact(
951
            'pyspider shell - Select task\n'
952
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
953
            'quit_interactive() - Quit interactive mode\n'
954
            'quit_pyspider() - Close pyspider'
955
        )
956
        if not is_crawled:
957
            self.ioloop.add_callback(self.ioloop.stop)
958
959
    def __getattr__(self, name):
960
        """patch for crawl(url, callback=self.index_page) API"""
961
        if self.interactive:
962
            return name
963
        raise AttributeError(name)
964
965
    def on_task_status(self, task):
966
        """Ignore not processing error in interactive mode"""
967
        if not self.interactive:
968
            super(OneScheduler, self).on_task_status(task)
969
970
        try:
971
            procesok = task['track']['process']['ok']
972
        except KeyError as e:
973
            logger.error("Bad status pack: %s", e)
974
            return None
975
976
        if procesok:
977
            ret = self.on_task_done(task)
978
        else:
979
            ret = self.on_task_failed(task)
980
        if task['track']['fetch'].get('time'):
981
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
982
                                       task['track']['fetch']['time'])
983
        if task['track']['process'].get('time'):
984
            self._cnt['5m_time'].event((task['project'], 'process_time'),
985
                                       task['track']['process'].get('time'))
986
        self.projects[task['project']].active_tasks.appendleft((time.time(), task))
987
        return ret
988
989
    def init_one(self, ioloop, fetcher, processor,
990
                 result_worker=None, interactive=False):
991
        self.ioloop = ioloop
992
        self.fetcher = fetcher
993
        self.processor = processor
994
        self.result_worker = result_worker
995
        self.interactive = interactive
996
        self.running_task = 0
997
998
    @gen.coroutine
999
    def do_task(self, task):
1000
        self.running_task += 1
1001
        result = yield gen.Task(self.fetcher.fetch, task)
1002
        type, task, response = result.args
1003
        self.processor.on_task(task, response)
1004
        # do with message
1005
        while not self.processor.inqueue.empty():
1006
            _task, _response = self.processor.inqueue.get()
1007
            self.processor.on_task(_task, _response)
1008
        # do with results
1009
        while not self.processor.result_queue.empty():
1010
            _task, _result = self.processor.result_queue.get()
1011
            if self.result_worker:
1012
                self.result_worker.on_result(_task, _result)
1013
        self.running_task -= 1
1014
1015
    def send_task(self, task, force=True):
1016
        if self.fetcher.http_client.free_size() <= 0:
1017
            if force:
1018
                self._send_buffer.appendleft(task)
1019
            else:
1020
                raise self.outqueue.Full
1021
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
1022
1023
    def run(self):
1024
        import tornado.ioloop
1025
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
1026
                                        io_loop=self.ioloop).start()
1027
        self.ioloop.start()
1028
1029
    def quit(self):
1030
        self.ioloop.stop()
1031
        logger.info("scheduler exiting...")
1032
1033
1034
import random
1035
import threading
1036
1037
1038
class ThreadBaseScheduler(Scheduler):
1039
    def __init__(self, threads=4, *args, **kwargs):
1040
        self.threads = threads
1041
        self.local = threading.local()
1042
1043
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
1044
1045
        self._taskdb = self.taskdb
1046
        self._projectdb = self.projectdb
1047
        self._resultdb = self.resultdb
1048
1049
        self.thread_objs = []
1050
        self.thread_queues = []
1051
        self._start_threads()
1052
        assert len(self.thread_queues) > 0
1053
1054
    @property
1055
    def taskdb(self):
1056
        if not hasattr(self.local, 'taskdb'):
1057
            self.taskdb = self._taskdb.copy()
1058
        return self.local.taskdb
1059
1060
    @taskdb.setter
1061
    def taskdb(self, taskdb):
1062
        self.local.taskdb = taskdb
1063
1064
    @property
1065
    def projectdb(self):
1066
        if not hasattr(self.local, 'projectdb'):
1067
            self.projectdb = self._projectdb.copy()
1068
        return self.local.projectdb
1069
1070
    @projectdb.setter
1071
    def projectdb(self, projectdb):
1072
        self.local.projectdb = projectdb
1073
1074
    @property
1075
    def resultdb(self):
1076
        if not hasattr(self.local, 'resultdb'):
1077
            self.resultdb = self._resultdb.copy()
1078
        return self.local.resultdb
1079
1080
    @resultdb.setter
1081
    def resultdb(self, resultdb):
1082
        self.local.resultdb = resultdb
1083
1084
    def _start_threads(self):
1085
        for i in range(self.threads):
1086
            queue = Queue.Queue()
1087
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
1088
            thread.daemon = True
1089
            thread.start()
1090
            self.thread_objs.append(thread)
1091
            self.thread_queues.append(queue)
1092
1093
    def _thread_worker(self, queue):
1094
        while True:
1095
            method, args, kwargs = queue.get()
1096
            try:
1097
                method(*args, **kwargs)
1098
            except Exception as e:
1099
                logger.exception(e)
1100
1101
    def _run_in_thread(self, method, *args, **kwargs):
1102
        i = kwargs.pop('_i', None)
1103
        block = kwargs.pop('_block', False)
1104
1105
        if i is None:
1106
            while True:
1107
                for queue in self.thread_queues:
1108
                    if queue.empty():
1109
                        break
1110
                else:
1111
                    if block:
1112
                        time.sleep(0.1)
1113
                        continue
1114
                    else:
1115
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1116
                break
1117
        else:
1118
            queue = self.thread_queues[i % len(self.thread_queues)]
1119
1120
        queue.put((method, args, kwargs))
1121
1122
        if block:
1123
            self._wait_thread()
1124
1125
    def _wait_thread(self):
1126
        while True:
1127
            if all(queue.empty() for queue in self.thread_queues):
1128
                break
1129
            time.sleep(0.1)
1130
1131
    def _update_project(self, project):
1132
        self._run_in_thread(Scheduler._update_project, self, project)
1133
1134
    def on_task_status(self, task):
1135
        i = hash(task['taskid'])
1136
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1137
1138
    def on_request(self, task):
1139
        i = hash(task['taskid'])
1140
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1141
1142
    def _load_put_task(self, project, taskid):
1143
        i = hash(taskid)
1144
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1145
1146
    def run_once(self):
1147
        super(ThreadBaseScheduler, self).run_once()
1148
        self._wait_thread()
1149