Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

OneScheduler.quit_interactive()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 5
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import itertools
10
import json
11
import logging
12
import os
13
import time
14
from collections import deque
15
16
from six import iteritems, itervalues
17
from six.moves import queue as Queue
18
19
from pyspider.libs import counter, utils
20
from .task_queue import TaskQueue
21
22
logger = logging.getLogger('scheduler')
23
24
25
class Scheduler(object):
26
    UPDATE_PROJECT_INTERVAL = 5 * 60
27
    default_schedule = {
28
        'priority': 0,
29
        'retries': 3,
30
        'exetime': 0,
31
        'age': -1,
32
        'itag': None,
33
    }
34
    LOOP_LIMIT = 1000
35
    LOOP_INTERVAL = 0.1
36
    ACTIVE_TASKS = 100
37
    INQUEUE_LIMIT = 0
38
    EXCEPTION_LIMIT = 3
39
    DELETE_TIME = 24 * 60 * 60
40
    DEFAULT_RETRY_DELAY = {
41
        0: 30,
42
        1: 1*60*60,
43
        2: 6*60*60,
44
        3: 12*60*60,
45
        '': 24*60*60
46
    }
47
48
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
49
                 out_queue, data_path='./data', resultdb=None):
50
        self.taskdb = taskdb
51
        self.projectdb = projectdb
52
        self.resultdb = resultdb
53
        self.newtask_queue = newtask_queue
54
        self.status_queue = status_queue
55
        self.out_queue = out_queue
56
        self.data_path = data_path
57
58
        self._send_buffer = deque()
59
        self._quit = False
60
        self._exceptions = 0
61
        self.projects = dict()
62
        self._force_update_project = False
63
        self._last_update_project = 0
64
        self.task_queue = dict()
65
        self._last_tick = int(time.time())
66
        self._sent_finished_event = dict()
67
68
        self._cnt = {
69
            "5m_time": counter.CounterManager(
70
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
71
            "5m": counter.CounterManager(
72
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
73
            "1h": counter.CounterManager(
74
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
75
            "1d": counter.CounterManager(
76
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
77
            "all": counter.CounterManager(
78
                lambda: counter.TotalCounter()),
79
        }
80
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
81
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
82
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
83
        self._last_dump_cnt = 0
84
85
    def _update_projects(self):
86
        '''Check project update'''
87
        now = time.time()
88
        if (
89
                not self._force_update_project
90
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
91
        ):
92
            return
93
        for project in self.projectdb.check_update(self._last_update_project):
94
            self._update_project(project)
95
            logger.debug("project: %s updated.", project['name'])
96
        self._force_update_project = False
97
        self._last_update_project = now
98
99
    def _update_project(self, project):
100
        '''update one project'''
101
        if project['name'] not in self.projects:
102
            self.projects[project['name']] = {}
103
        self.projects[project['name']].update(project)
104
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
105
        if not self.projects[project['name']].get('active_tasks', None):
106
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
107
108
        # load task queue when project is running and delete task_queue when project is stoped
109
        if project['status'] in ('RUNNING', 'DEBUG'):
110
            if project['name'] not in self.task_queue:
111
                self._load_tasks(project['name'])
112
            self.task_queue[project['name']].rate = project['rate']
113
            self.task_queue[project['name']].burst = project['burst']
114
115
            # update project runtime info from processor by sending a _on_get_info
116
            # request, result is in status_page.track.save
117
            self.on_select_task({
118
                'taskid': '_on_get_info',
119
                'project': project['name'],
120
                'url': 'data:,_on_get_info',
121
                'status': self.taskdb.SUCCESS,
122
                'fetch': {
123
                    'save': ['min_tick', 'retry_delay'],
124
                },
125
                'process': {
126
                    'callback': '_on_get_info',
127
                },
128
            })
129
        else:
130
            if project['name'] in self.task_queue:
131
                self.task_queue[project['name']].rate = 0
132
                self.task_queue[project['name']].burst = 0
133
                del self.task_queue[project['name']]
134
135
            if project not in self._cnt['all']:
136
                self._update_project_cnt(project['name'])
137
138
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
139
140
    def _load_tasks(self, project):
141
        '''load tasks from database'''
142
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
143
        for task in self.taskdb.load_tasks(
144
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
145
        ):
146
            taskid = task['taskid']
147
            _schedule = task.get('schedule', self.default_schedule)
148
            priority = _schedule.get('priority', self.default_schedule['priority'])
149
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
150
            self.task_queue[project].put(taskid, priority, exetime)
151
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
152
153
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
154
            self.task_queue[project].rate = self.projects[project]['rate']
155
            self.task_queue[project].burst = self.projects[project]['burst']
156
        else:
157
            self.task_queue[project].rate = 0
158
            self.task_queue[project].burst = 0
159
160
        if project not in self._cnt['all']:
161
            self._update_project_cnt(project)
162
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
163
164
    def _update_project_cnt(self, project):
165
        status_count = self.taskdb.status_count(project)
166
        self._cnt['all'].value(
167
            (project, 'success'),
168
            status_count.get(self.taskdb.SUCCESS, 0)
169
        )
170
        self._cnt['all'].value(
171
            (project, 'failed'),
172
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
173
        )
174
        self._cnt['all'].value(
175
            (project, 'pending'),
176
            status_count.get(self.taskdb.ACTIVE, 0)
177
        )
178
179
    def task_verify(self, task):
180
        '''
181
        return False if any of 'taskid', 'project', 'url' is not in task dict
182
                        or project in not in task_queue
183
        '''
184
        for each in ('taskid', 'project', 'url', ):
185
            if each not in task or not task[each]:
186
                logger.error('%s not in task: %.200r', each, task)
187
                return False
188
        if task['project'] not in self.task_queue:
189
            logger.error('unknown project: %s', task['project'])
190
            return False
191
        return True
192
193
    def insert_task(self, task):
194
        '''insert task into database'''
195
        return self.taskdb.insert(task['project'], task['taskid'], task)
196
197
    def update_task(self, task):
198
        '''update task in database'''
199
        return self.taskdb.update(task['project'], task['taskid'], task)
200
201
    def put_task(self, task):
202
        '''put task to task queue'''
203
        _schedule = task.get('schedule', self.default_schedule)
204
        self.task_queue[task['project']].put(
205
            task['taskid'],
206
            priority=_schedule.get('priority', self.default_schedule['priority']),
207
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
208
        )
209
210
    def send_task(self, task, force=True):
211
        '''
212
        dispatch task to fetcher
213
214
        out queue may have size limit to prevent block, a send_buffer is used
215
        '''
216
        try:
217
            self.out_queue.put_nowait(task)
218
        except Queue.Full:
219
            if force:
220
                self._send_buffer.appendleft(task)
221
            else:
222
                raise
223
224
    def _check_task_done(self):
225
        '''Check status queue'''
226
        cnt = 0
227
        try:
228
            while True:
229
                task = self.status_queue.get_nowait()
230
                # check _on_get_info result here
231
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
232
                    if task['project'] not in self.projects:
233
                        continue
234
                    self.projects[task['project']].update(task['track'].get('save') or {})
235
                    logger.info(
236
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
237
                    )
238
                    continue
239
                elif not self.task_verify(task):
240
                    continue
241
                self.on_task_status(task)
242
                cnt += 1
243
        except Queue.Empty:
244
            pass
245
        return cnt
246
247
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
248
249
    def _check_request(self):
250
        '''Check new task queue'''
251
        tasks = {}
252
        while len(tasks) < self.LOOP_LIMIT:
253
            try:
254
                task = self.newtask_queue.get_nowait()
255
            except Queue.Empty:
256
                break
257
258
            if isinstance(task, list):
259
                _tasks = task
260
            else:
261
                _tasks = (task, )
262
263
            for task in _tasks:
264
                if not self.task_verify(task):
265
                    continue
266
267
                if task['taskid'] in self.task_queue[task['project']]:
268
                    if not task.get('schedule', {}).get('force_update', False):
269
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
270
                        continue
271
272
                if task['taskid'] in tasks:
273
                    if not task.get('schedule', {}).get('force_update', False):
274
                        continue
275
276
                tasks[task['taskid']] = task
277
278
        for task in itervalues(tasks):
279
            self.on_request(task)
280
281
        return len(tasks)
282
283
    def _check_cronjob(self):
284
        """Check projects cronjob tick, return True when a new tick is sended"""
285
        now = time.time()
286
        self._last_tick = int(self._last_tick)
287
        if now - self._last_tick < 1:
288
            return False
289
        self._last_tick += 1
290
        for project in itervalues(self.projects):
291
            if project['status'] not in ('DEBUG', 'RUNNING'):
292
                continue
293
            if project.get('min_tick', 0) == 0:
294
                continue
295
            if self._last_tick % int(project['min_tick']) != 0:
296
                continue
297
            self.on_select_task({
298
                'taskid': '_on_cronjob',
299
                'project': project['name'],
300
                'url': 'data:,_on_cronjob',
301
                'status': self.taskdb.SUCCESS,
302
                'fetch': {
303
                    'save': {
304
                        'tick': self._last_tick,
305
                    },
306
                },
307
                'process': {
308
                    'callback': '_on_cronjob',
309
                },
310
            })
311
        return True
312
313
    request_task_fields = [
314
        'taskid',
315
        'project',
316
        'url',
317
        'status',
318
        'schedule',
319
        'fetch',
320
        'process',
321
        'track',
322
        'lastcrawltime'
323
    ]
324
325
    def _check_select(self):
326
        '''Select task to fetch & process'''
327
        while self._send_buffer:
328
            _task = self._send_buffer.pop()
329
            try:
330
                # use force=False here to prevent automatic send_buffer append and get exception
331
                self.send_task(_task, False)
332
            except Queue.Full:
333
                self._send_buffer.append(_task)
334
                break
335
336
        if self.out_queue.full():
337
            return {}
338
339
        taskids = []
340
        cnt = 0
341
        cnt_dict = dict()
342
        limit = self.LOOP_LIMIT
343
        for project, task_queue in iteritems(self.task_queue):
344
            if cnt >= limit:
345
                break
346
347
            # task queue
348
            self.task_queue[project].check_update()
349
            project_cnt = 0
350
351
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
352
            while cnt < limit and project_cnt < limit / 10:
353
                taskid = task_queue.get()
354
                if not taskid:
355
                    break
356
357
                taskids.append((project, taskid))
358
                project_cnt += 1
359
                cnt += 1
360
361
            cnt_dict[project] = project_cnt
362
            if project_cnt:
363
                self._sent_finished_event[project] = 'need'
364
            # check and send finished event to project
365
            elif len(task_queue) == 0 and self._sent_finished_event.get(project) == 'need':
366
                self._sent_finished_event[project] = 'sent'
367
                self.on_select_task({
368
                    'taskid': 'on_finished',
369
                    'project': project,
370
                    'url': 'data:,on_finished',
371
                    'status': self.taskdb.SUCCESS,
372
                    'process': {
373
                        'callback': 'on_finished',
374
                    },
375
                })
376
377
        for project, taskid in taskids:
378
            self._load_put_task(project, taskid)
379
380
        return cnt_dict
381
382
    def _load_put_task(self, project, taskid):
383
        task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
384
        if not task:
385
            return
386
        task = self.on_select_task(task)
387
388
    def _print_counter_log(self):
389
        # print top 5 active counters
390
        keywords = ('pending', 'success', 'retry', 'failed')
391
        total_cnt = {}
392
        project_actives = []
393
        project_fails = []
394
        for key in keywords:
395
            total_cnt[key] = 0
396
        for project, subcounter in iteritems(self._cnt['5m']):
397
            actives = 0
398
            for key in keywords:
399
                cnt = subcounter.get(key, None)
400
                if cnt:
401
                    cnt = cnt.sum
402
                    total_cnt[key] += cnt
403
                    actives += cnt
404
405
            project_actives.append((actives, project))
406
407
            fails = subcounter.get('failed', None)
408
            if fails:
409
                project_fails.append((fails.sum, project))
410
411
        top_2_fails = sorted(project_fails, reverse=True)[:2]
412
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
413
                               reverse=True)[:5 - len(top_2_fails)]
414
415
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
416
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
417
        for _, project in itertools.chain(top_3_actives, top_2_fails):
418
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
419
            log_str += " %s:%d,%d,%d,%d" % (project,
420
                                            subcounter.get('pending', 0),
421
                                            subcounter.get('success', 0),
422
                                            subcounter.get('retry', 0),
423
                                            subcounter.get('failed', 0))
424
        logger.info(log_str)
425
426
    def _dump_cnt(self):
427
        '''Dump counters to file'''
428
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
429
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
430
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
431
432
    def _try_dump_cnt(self):
433
        '''Dump counters every 60 seconds'''
434
        now = time.time()
435
        if now - self._last_dump_cnt > 60:
436
            self._last_dump_cnt = now
437
            self._dump_cnt()
438
            self._print_counter_log()
439
440
    def _check_delete(self):
441
        '''Check project delete'''
442
        now = time.time()
443
        for project in list(itervalues(self.projects)):
444
            if project['status'] != 'STOP':
445
                continue
446
            if now - project['updatetime'] < self.DELETE_TIME:
447
                continue
448
            if 'delete' not in self.projectdb.split_group(project['group']):
449
                continue
450
451
            logger.warning("deleting project: %s!", project['name'])
452
            if project['name'] in self.task_queue:
453
                self.task_queue[project['name']].rate = 0
454
                self.task_queue[project['name']].burst = 0
455
                del self.task_queue[project['name']]
456
            del self.projects[project['name']]
457
            self.taskdb.drop(project['name'])
458
            self.projectdb.drop(project['name'])
459
            if self.resultdb:
460
                self.resultdb.drop(project['name'])
461
            for each in self._cnt.values():
462
                del each[project['name']]
463
464
    def __len__(self):
465
        return sum(len(x) for x in itervalues(self.task_queue))
466
467
    def quit(self):
468
        '''Set quit signal'''
469
        self._quit = True
470
        # stop xmlrpc server
471
        if hasattr(self, 'xmlrpc_server'):
472
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
473
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
474
475
    def run_once(self):
476
        '''comsume queues and feed tasks to fetcher, once'''
477
478
        self._update_projects()
479
        self._check_task_done()
480
        self._check_request()
481
        while self._check_cronjob():
482
            pass
483
        self._check_select()
484
        self._check_delete()
485
        self._try_dump_cnt()
486
487
    def run(self):
488
        '''Start scheduler loop'''
489
        logger.info("loading projects")
490
491
        while not self._quit:
492
            try:
493
                time.sleep(self.LOOP_INTERVAL)
494
                self.run_once()
495
                self._exceptions = 0
496
            except KeyboardInterrupt:
497
                break
498
            except Exception as e:
499
                logger.exception(e)
500
                self._exceptions += 1
501
                if self._exceptions > self.EXCEPTION_LIMIT:
502
                    break
503
                continue
504
505
        logger.info("scheduler exiting...")
506
        self._dump_cnt()
507
508
    def trigger_on_start(self, project):
509
        '''trigger an on_start callback of project'''
510
        self.newtask_queue.put({
511
            "project": project,
512
            "taskid": "on_start",
513
            "url": "data:,on_start",
514
            "process": {
515
                "callback": "on_start",
516
            },
517
        })
518
519
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
520
        '''Start xmlrpc interface'''
521
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
522
523
        application = WSGIXMLRPCApplication()
524
525
        application.register_function(self.quit, '_quit')
526
        application.register_function(self.__len__, 'size')
527
528
        def dump_counter(_time, _type):
529
            try:
530
                return self._cnt[_time].to_dict(_type)
531
            except:
532
                logger.exception('')
533
        application.register_function(dump_counter, 'counter')
534
535
        def new_task(task):
536
            if self.task_verify(task):
537
                self.newtask_queue.put(task)
538
                return True
539
            return False
540
        application.register_function(new_task, 'newtask')
541
542
        def send_task(task):
543
            '''dispatch task to fetcher'''
544
            self.send_task(task)
545
            return True
546
        application.register_function(send_task, 'send_task')
547
548
        def update_project():
549
            self._force_update_project = True
550
        application.register_function(update_project, 'update_project')
551
552
        def get_active_tasks(project=None, limit=100):
553
            allowed_keys = set((
554
                'taskid',
555
                'project',
556
                'status',
557
                'url',
558
                'lastcrawltime',
559
                'updatetime',
560
                'track',
561
            ))
562
            track_allowed_keys = set((
563
                'ok',
564
                'time',
565
                'follows',
566
                'status_code',
567
            ))
568
569
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
570
                     if x and (k == project if project else True)]
571
            tasks = [next(x, None) for x in iters]
572
            result = []
573
574
            while len(result) < limit and tasks and not all(x is None for x in tasks):
575
                updatetime, task = t = max(t for t in tasks if t)
576
                i = tasks.index(t)
577
                tasks[i] = next(iters[i], None)
578
                for key in list(task):
579
                    if key == 'track':
580
                        for k in list(task[key].get('fetch', [])):
581
                            if k not in track_allowed_keys:
582
                                del task[key]['fetch'][k]
583
                        for k in list(task[key].get('process', [])):
584
                            if k not in track_allowed_keys:
585
                                del task[key]['process'][k]
586
                    if key in allowed_keys:
587
                        continue
588
                    del task[key]
589
                result.append(t)
590
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
591
            # have no idea why
592
            return json.loads(json.dumps(result))
593
        application.register_function(get_active_tasks, 'get_active_tasks')
594
595
        import tornado.wsgi
596
        import tornado.ioloop
597
        import tornado.httpserver
598
599
        container = tornado.wsgi.WSGIContainer(application)
600
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
601
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
602
        self.xmlrpc_server.listen(port=port, address=bind)
603
        self.xmlrpc_ioloop.start()
604
605
    def on_request(self, task):
606
        if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
607
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
608
            return
609
610
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
611
                                       fields=self.merge_task_fields)
612
        if oldtask:
613
            return self.on_old_request(task, oldtask)
614
        else:
615
            return self.on_new_request(task)
616
617
    def on_new_request(self, task):
618
        '''Called when a new request is arrived'''
619
        task['status'] = self.taskdb.ACTIVE
620
        self.insert_task(task)
621
        self.put_task(task)
622
623
        project = task['project']
624
        self._cnt['5m'].event((project, 'pending'), +1)
625
        self._cnt['1h'].event((project, 'pending'), +1)
626
        self._cnt['1d'].event((project, 'pending'), +1)
627
        self._cnt['all'].event((project, 'pending'), +1)
628
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
629
        return task
630
631
    def on_old_request(self, task, old_task):
632
        '''Called when a crawled task is arrived'''
633
        now = time.time()
634
635
        _schedule = task.get('schedule', self.default_schedule)
636
        old_schedule = old_task.get('schedule', {})
637
638
        restart = False
639
        schedule_age = _schedule.get('age', self.default_schedule['age'])
640
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
641
            restart = True
642
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
643
            restart = True
644
        elif _schedule.get('force_update'):
645
            restart = True
646
647
        if not restart:
648
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
649
            return
650
651
        task['status'] = self.taskdb.ACTIVE
652
        self.update_task(task)
653
        self.put_task(task)
654
655
        project = task['project']
656
        if old_task['status'] != self.taskdb.ACTIVE:
657
            self._cnt['5m'].event((project, 'pending'), +1)
658
            self._cnt['1h'].event((project, 'pending'), +1)
659
            self._cnt['1d'].event((project, 'pending'), +1)
660
        if old_task['status'] == self.taskdb.SUCCESS:
661
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
662
        elif old_task['status'] == self.taskdb.FAILED:
663
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
664
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
665
        return task
666
667
    def on_task_status(self, task):
668
        '''Called when a status pack is arrived'''
669
        try:
670
            procesok = task['track']['process']['ok']
671
            if not self.task_queue[task['project']].done(task['taskid']):
672
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
673
                return None
674
        except KeyError as e:
675
            logger.error("Bad status pack: %s", e)
676
            return None
677
678
        if procesok:
679
            ret = self.on_task_done(task)
680
        else:
681
            ret = self.on_task_failed(task)
682
683
        if task['track']['fetch'].get('time'):
684
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
685
                                       task['track']['fetch']['time'])
686
        if task['track']['process'].get('time'):
687
            self._cnt['5m_time'].event((task['project'], 'process_time'),
688
                                       task['track']['process'].get('time'))
689
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
690
        return ret
691
692
    def on_task_done(self, task):
693
        '''Called when a task is done and success, called by `on_task_status`'''
694
        task['status'] = self.taskdb.SUCCESS
695
        task['lastcrawltime'] = time.time()
696
697
        if 'schedule' in task:
698
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
699
                task['status'] = self.taskdb.ACTIVE
700
                next_exetime = task['schedule'].get('age')
701
                task['schedule']['exetime'] = time.time() + next_exetime
702
                self.put_task(task)
703
            else:
704
                del task['schedule']
705
        self.update_task(task)
706
707
        project = task['project']
708
        self._cnt['5m'].event((project, 'success'), +1)
709
        self._cnt['1h'].event((project, 'success'), +1)
710
        self._cnt['1d'].event((project, 'success'), +1)
711
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
712
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
713
        return task
714
715
    def on_task_failed(self, task):
716
        '''Called when a task is failed, called by `on_task_status`'''
717
718
        if 'schedule' not in task:
719
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
720
            if old_task is None:
721
                logging.error('unknown status pack: %s' % task)
722
                return
723
            task['schedule'] = old_task.get('schedule', {})
724
725
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
726
        retried = task['schedule'].get('retried', 0)
727
728
        project_info = self.projects.get(task['project'], {})
729
        retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY
730
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
731
732
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
733
            next_exetime = min(next_exetime, task['schedule'].get('age'))
734
        else:
735
            if retried >= retries:
736
                next_exetime = -1
737
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
738
                next_exetime = task['schedule'].get('age')
739
740
        if next_exetime < 0:
741
            task['status'] = self.taskdb.FAILED
742
            task['lastcrawltime'] = time.time()
743
            self.update_task(task)
744
745
            project = task['project']
746
            self._cnt['5m'].event((project, 'failed'), +1)
747
            self._cnt['1h'].event((project, 'failed'), +1)
748
            self._cnt['1d'].event((project, 'failed'), +1)
749
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
750
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
751
            return task
752
        else:
753
            task['schedule']['retried'] = retried + 1
754
            task['schedule']['exetime'] = time.time() + next_exetime
755
            task['lastcrawltime'] = time.time()
756
            self.update_task(task)
757
            self.put_task(task)
758
759
            project = task['project']
760
            self._cnt['5m'].event((project, 'retry'), +1)
761
            self._cnt['1h'].event((project, 'retry'), +1)
762
            self._cnt['1d'].event((project, 'retry'), +1)
763
            # self._cnt['all'].event((project, 'retry'), +1)
764
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
765
                retried, retries), task)
766
            return task
767
768
    def on_select_task(self, task):
769
        '''Called when a task is selected to fetch & process'''
770
        # inject informations about project
771
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
772
773
        project_info = self.projects.get(task['project'])
774
        assert project_info, 'no such project'
775
        task['group'] = project_info.get('group')
776
        task['project_md5sum'] = project_info.get('md5sum')
777
        task['project_updatetime'] = project_info.get('updatetime', 0)
778
        project_info['active_tasks'].appendleft((time.time(), task))
779
        self.send_task(task)
780
        return task
781
782
783
from tornado import gen
784
785
786
class OneScheduler(Scheduler):
787
    """
788
    Scheduler Mixin class for one mode
789
790
    overwirted send_task method
791
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
792
    """
793
794
    def _check_select(self):
795
        """
796
        interactive mode of select tasks
797
        """
798
        if not self.interactive:
799
            return super(OneScheduler, self)._check_select()
800
801
        # waiting for running tasks
802
        if self.running_task > 0:
803
            return
804
805
        is_crawled = []
806
807
        def run(project=None):
808
            return crawl('on_start', project=project)
809
810
        def crawl(url, project=None, **kwargs):
811
            """
812
            Crawl given url, same parameters as BaseHandler.crawl
813
814
            url - url or taskid, parameters will be used if in taskdb
815
            project - can be ignored if only one project exists.
816
            """
817
818
            # looking up the project instance
819
            if project is None:
820
                if len(self.projects) == 1:
821
                    project = list(self.projects.keys())[0]
822
                else:
823
                    raise LookupError('You need specify the project: %r'
824
                                      % list(self.projects.keys()))
825
            project_data = self.processor.project_manager.get(project)
826
            if not project_data:
827
                raise LookupError('no such project: %s' % project)
828
829
            # get task package
830
            instance = project_data['instance']
831
            instance._reset()
832
            task = instance.crawl(url, **kwargs)
833
            if isinstance(task, list):
834
                raise Exception('url list is not allowed in interactive mode')
835
836
            # check task in taskdb
837
            if not kwargs:
838
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
839
                                              fields=self.request_task_fields)
840
                if not dbtask:
841
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
842
                                                  fields=self.request_task_fields)
843
                if dbtask:
844
                    task = dbtask
845
846
            # select the task
847
            self.on_select_task(task)
848
            is_crawled.append(True)
849
850
            shell.ask_exit()
851
852
        def quit_interactive():
853
            '''Quit interactive mode'''
854
            is_crawled.append(True)
855
            self.interactive = False
856
            shell.ask_exit()
857
858
        def quit_pyspider():
859
            '''Close pyspider'''
860
            is_crawled[:] = []
861
            shell.ask_exit()
862
863
        shell = utils.get_python_console()
864
        shell.interact(
865
            'pyspider shell - Select task\n'
866
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
867
            'quit_interactive() - Quit interactive mode\n'
868
            'quit_pyspider() - Close pyspider'
869
        )
870
        if not is_crawled:
871
            self.ioloop.add_callback(self.ioloop.stop)
872
873
    def __getattr__(self, name):
874
        """patch for crawl(url, callback=self.index_page) API"""
875
        if self.interactive:
876
            return name
877
        raise AttributeError(name)
878
879
    def on_task_status(self, task):
880
        """Ignore not processing error in interactive mode"""
881
        if not self.interactive:
882
            super(OneScheduler, self).on_task_status(task)
883
884
        try:
885
            procesok = task['track']['process']['ok']
886
        except KeyError as e:
887
            logger.error("Bad status pack: %s", e)
888
            return None
889
890
        if procesok:
891
            ret = self.on_task_done(task)
892
        else:
893
            ret = self.on_task_failed(task)
894
        if task['track']['fetch'].get('time'):
895
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
896
                                       task['track']['fetch']['time'])
897
        if task['track']['process'].get('time'):
898
            self._cnt['5m_time'].event((task['project'], 'process_time'),
899
                                       task['track']['process'].get('time'))
900
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
901
        return ret
902
903
    def init_one(self, ioloop, fetcher, processor,
904
                 result_worker=None, interactive=False):
905
        self.ioloop = ioloop
906
        self.fetcher = fetcher
907
        self.processor = processor
908
        self.result_worker = result_worker
909
        self.interactive = interactive
910
        self.running_task = 0
911
912
    @gen.coroutine
913
    def do_task(self, task):
914
        self.running_task += 1
915
        result = yield gen.Task(self.fetcher.fetch, task)
916
        type, task, response = result.args
917
        self.processor.on_task(task, response)
918
        # do with message
919
        while not self.processor.inqueue.empty():
920
            _task, _response = self.processor.inqueue.get()
921
            self.processor.on_task(_task, _response)
922
        # do with results
923
        while not self.processor.result_queue.empty():
924
            _task, _result = self.processor.result_queue.get()
925
            if self.result_worker:
926
                self.result_worker.on_result(_task, _result)
927
        self.running_task -= 1
928
929
    def send_task(self, task, force=True):
930
        if self.fetcher.http_client.free_size() <= 0:
931
            if force:
932
                self._send_buffer.appendleft(task)
933
            else:
934
                raise self.outqueue.Full
935
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
936
937
    def run(self):
938
        import tornado.ioloop
939
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
940
                                        io_loop=self.ioloop).start()
941
        self.ioloop.start()
942
943
    def quit(self):
944
        self.ioloop.stop()
945
        logger.info("scheduler exiting...")
946
947
948
import random
949
import threading
950
951
952
class ThreadBaseScheduler(Scheduler):
953
    def __init__(self, threads=4, *args, **kwargs):
954
        self.threads = threads
955
        self.local = threading.local()
956
957
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
958
959
        self._taskdb = self.taskdb
960
        self._projectdb = self.projectdb
961
        self._resultdb = self.resultdb
962
963
        self.thread_objs = []
964
        self.thread_queues = []
965
        self._start_threads()
966
        assert len(self.thread_queues) > 0
967
968
    @property
969
    def taskdb(self):
970
        if not hasattr(self.local, 'taskdb'):
971
            self.taskdb = self._taskdb.copy()
972
        return self.local.taskdb
973
974
    @taskdb.setter
975
    def taskdb(self, taskdb):
976
        self.local.taskdb = taskdb
977
978
    @property
979
    def projectdb(self):
980
        if not hasattr(self.local, 'projectdb'):
981
            self.projectdb = self._projectdb.copy()
982
        return self.local.projectdb
983
984
    @projectdb.setter
985
    def projectdb(self, projectdb):
986
        self.local.projectdb = projectdb
987
988
    @property
989
    def resultdb(self):
990
        if not hasattr(self.local, 'resultdb'):
991
            self.resultdb = self._resultdb.copy()
992
        return self.local.resultdb
993
994
    @resultdb.setter
995
    def resultdb(self, resultdb):
996
        self.local.resultdb = resultdb
997
998
    def _start_threads(self):
999
        for i in range(self.threads):
1000
            queue = Queue.Queue()
1001
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
1002
            thread.daemon = True
1003
            thread.start()
1004
            self.thread_objs.append(thread)
1005
            self.thread_queues.append(queue)
1006
1007
    def _thread_worker(self, queue):
1008
        while True:
1009
            method, args, kwargs = queue.get()
1010
            try:
1011
                method(*args, **kwargs)
1012
            except Exception as e:
1013
                logger.exception(e)
1014
1015
    def _run_in_thread(self, method, *args, **kwargs):
1016
        i = kwargs.pop('_i', None)
1017
        block = kwargs.pop('_block', False)
1018
1019
        if i is None:
1020
            while True:
1021
                for queue in self.thread_queues:
1022
                    if queue.empty():
1023
                        break
1024
                else:
1025
                    if block:
1026
                        time.sleep(0.1)
1027
                        continue
1028
                    else:
1029
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1030
                break
1031
        else:
1032
            queue = self.thread_queues[i % len(self.thread_queues)]
1033
1034
        queue.put((method, args, kwargs))
1035
1036
        if block:
1037
            self._wait_thread()
1038
1039
    def _wait_thread(self):
1040
        while True:
1041
            if all(queue.empty() for queue in self.thread_queues):
1042
                break
1043
            time.sleep(0.1)
1044
1045
    def _update_project(self, project):
1046
        self._run_in_thread(Scheduler._update_project, self, project)
1047
1048
    def on_task_status(self, task):
1049
        i = hash(task['taskid'])
1050
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1051
1052
    def on_request(self, task):
1053
        i = hash(task['taskid'])
1054
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1055
1056
    def _load_put_task(self, project, taskid):
1057
        i = hash(taskid)
1058
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1059
1060
    def run_once(self):
1061
        super(ThreadBaseScheduler, self).run_once()
1062
        self._wait_thread()
1063