Completed
Push — master ( 313c98...8022cc )
by Roy
01:10
created

pyspider.scheduler.Scheduler.send_task()   A

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 13
rs 9.4285
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import itertools
10
import json
11
import logging
12
import os
13
import time
14
from collections import deque
15
16
from six import iteritems, itervalues
17
from six.moves import queue as Queue
18
19
from pyspider.libs import counter, utils
20
from .task_queue import TaskQueue
21
22
logger = logging.getLogger('scheduler')
23
24
25
class Scheduler(object):
26
    UPDATE_PROJECT_INTERVAL = 5 * 60
27
    default_schedule = {
28
        'priority': 0,
29
        'retries': 3,
30
        'exetime': 0,
31
        'age': -1,
32
        'itag': None,
33
    }
34
    LOOP_LIMIT = 1000
35
    LOOP_INTERVAL = 0.1
36
    ACTIVE_TASKS = 100
37
    INQUEUE_LIMIT = 0
38
    EXCEPTION_LIMIT = 3
39
    DELETE_TIME = 24 * 60 * 60
40
    DEFAULT_RETRY_DELAY = {
41
        0: 30,
42
        1: 1*60*60,
43
        2: 6*60*60,
44
        3: 12*60*60,
45
        '': 24*60*60
46
    }
47
48
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
49
                 out_queue, data_path='./data', resultdb=None):
50
        self.taskdb = taskdb
51
        self.projectdb = projectdb
52
        self.resultdb = resultdb
53
        self.newtask_queue = newtask_queue
54
        self.status_queue = status_queue
55
        self.out_queue = out_queue
56
        self.data_path = data_path
57
58
        self._send_buffer = deque()
59
        self._quit = False
60
        self._exceptions = 0
61
        self.projects = dict()
62
        self._force_update_project = False
63
        self._last_update_project = 0
64
        self.task_queue = dict()
65
        self._last_tick = int(time.time())
66
67
        self._cnt = {
68
            "5m_time": counter.CounterManager(
69
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
70
            "5m": counter.CounterManager(
71
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
72
            "1h": counter.CounterManager(
73
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
74
            "1d": counter.CounterManager(
75
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
76
            "all": counter.CounterManager(
77
                lambda: counter.TotalCounter()),
78
        }
79
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
80
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
81
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
82
        self._last_dump_cnt = 0
83
84
    def _update_projects(self):
85
        '''Check project update'''
86
        now = time.time()
87
        if (
88
                not self._force_update_project
89
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
90
        ):
91
            return
92
        for project in self.projectdb.check_update(self._last_update_project):
93
            self._update_project(project)
94
            logger.debug("project: %s updated.", project['name'])
95
        self._force_update_project = False
96
        self._last_update_project = now
97
98
    def _update_project(self, project):
99
        '''update one project'''
100
        if project['name'] not in self.projects:
101
            self.projects[project['name']] = {}
102
        self.projects[project['name']].update(project)
103
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
104
        if not self.projects[project['name']].get('active_tasks', None):
105
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
106
107
        # load task queue when project is running and delete task_queue when project is stoped
108
        if project['status'] in ('RUNNING', 'DEBUG'):
109
            if project['name'] not in self.task_queue:
110
                self._load_tasks(project['name'])
111
            self.task_queue[project['name']].rate = project['rate']
112
            self.task_queue[project['name']].burst = project['burst']
113
114
            # update project runtime info from processor by sending a _on_get_info
115
            # request, result is in status_page.track.save
116
            self.on_select_task({
117
                'taskid': '_on_get_info',
118
                'project': project['name'],
119
                'url': 'data:,_on_get_info',
120
                'status': self.taskdb.SUCCESS,
121
                'fetch': {
122
                    'save': ['min_tick', 'retry_delay'],
123
                },
124
                'process': {
125
                    'callback': '_on_get_info',
126
                },
127
            })
128
        else:
129
            if project['name'] in self.task_queue:
130
                self.task_queue[project['name']].rate = 0
131
                self.task_queue[project['name']].burst = 0
132
                del self.task_queue[project['name']]
133
134
            if project not in self._cnt['all']:
135
                self._update_project_cnt(project['name'])
136
137
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
138
139
    def _load_tasks(self, project):
140
        '''load tasks from database'''
141
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
142
        for task in self.taskdb.load_tasks(
143
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
144
        ):
145
            taskid = task['taskid']
146
            _schedule = task.get('schedule', self.default_schedule)
147
            priority = _schedule.get('priority', self.default_schedule['priority'])
148
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
149
            self.task_queue[project].put(taskid, priority, exetime)
150
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
151
152
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
153
            self.task_queue[project].rate = self.projects[project]['rate']
154
            self.task_queue[project].burst = self.projects[project]['burst']
155
        else:
156
            self.task_queue[project].rate = 0
157
            self.task_queue[project].burst = 0
158
159
        if project not in self._cnt['all']:
160
            self._update_project_cnt(project)
161
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
162
163
    def _update_project_cnt(self, project):
164
        status_count = self.taskdb.status_count(project)
165
        self._cnt['all'].value(
166
            (project, 'success'),
167
            status_count.get(self.taskdb.SUCCESS, 0)
168
        )
169
        self._cnt['all'].value(
170
            (project, 'failed'),
171
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
172
        )
173
        self._cnt['all'].value(
174
            (project, 'pending'),
175
            status_count.get(self.taskdb.ACTIVE, 0)
176
        )
177
178
    def task_verify(self, task):
179
        '''
180
        return False if any of 'taskid', 'project', 'url' is not in task dict
181
                        or project in not in task_queue
182
        '''
183
        for each in ('taskid', 'project', 'url', ):
184
            if each not in task or not task[each]:
185
                logger.error('%s not in task: %.200r', each, task)
186
                return False
187
        if task['project'] not in self.task_queue:
188
            logger.error('unknown project: %s', task['project'])
189
            return False
190
        return True
191
192
    def insert_task(self, task):
193
        '''insert task into database'''
194
        return self.taskdb.insert(task['project'], task['taskid'], task)
195
196
    def update_task(self, task):
197
        '''update task in database'''
198
        return self.taskdb.update(task['project'], task['taskid'], task)
199
200
    def put_task(self, task):
201
        '''put task to task queue'''
202
        _schedule = task.get('schedule', self.default_schedule)
203
        self.task_queue[task['project']].put(
204
            task['taskid'],
205
            priority=_schedule.get('priority', self.default_schedule['priority']),
206
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
207
        )
208
209
    def send_task(self, task, force=True):
210
        '''
211
        dispatch task to fetcher
212
213
        out queue may have size limit to prevent block, a send_buffer is used
214
        '''
215
        try:
216
            self.out_queue.put_nowait(task)
217
        except Queue.Full:
218
            if force:
219
                self._send_buffer.appendleft(task)
220
            else:
221
                raise
222
223
    def _check_task_done(self):
224
        '''Check status queue'''
225
        cnt = 0
226
        try:
227
            while True:
228
                task = self.status_queue.get_nowait()
229
                # check _on_get_info result here
230
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
231
                    if task['project'] not in self.projects:
232
                        continue
233
                    self.projects[task['project']].update(task['track'].get('save') or {})
234
                    logger.info(
235
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
236
                    )
237
                    continue
238
                elif not self.task_verify(task):
239
                    continue
240
                self.on_task_status(task)
241
                cnt += 1
242
        except Queue.Empty:
243
            pass
244
        return cnt
245
246
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
247
248
    def _check_request(self):
249
        '''Check new task queue'''
250
        tasks = {}
251
        while len(tasks) < self.LOOP_LIMIT:
252
            try:
253
                task = self.newtask_queue.get_nowait()
254
            except Queue.Empty:
255
                break
256
257
            if isinstance(task, list):
258
                _tasks = task
259
            else:
260
                _tasks = (task, )
261
262
            for task in _tasks:
263
                if not self.task_verify(task):
264
                    continue
265
266
                if task['taskid'] in self.task_queue[task['project']]:
267
                    if not task.get('schedule', {}).get('force_update', False):
268
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
269
                        continue
270
271
                if task['taskid'] in tasks:
272
                    if not task.get('schedule', {}).get('force_update', False):
273
                        continue
274
275
                tasks[task['taskid']] = task
276
277
        for task in itervalues(tasks):
278
            self.on_request(task)
279
280
        return len(tasks)
281
282
    def _check_cronjob(self):
283
        """Check projects cronjob tick, return True when a new tick is sended"""
284
        now = time.time()
285
        self._last_tick = int(self._last_tick)
286
        if now - self._last_tick < 1:
287
            return False
288
        self._last_tick += 1
289
        for project in itervalues(self.projects):
290
            if project['status'] not in ('DEBUG', 'RUNNING'):
291
                continue
292
            if project.get('min_tick', 0) == 0:
293
                continue
294
            if self._last_tick % int(project['min_tick']) != 0:
295
                continue
296
            self.on_select_task({
297
                'taskid': '_on_cronjob',
298
                'project': project['name'],
299
                'url': 'data:,_on_cronjob',
300
                'status': self.taskdb.SUCCESS,
301
                'fetch': {
302
                    'save': {
303
                        'tick': self._last_tick,
304
                    },
305
                },
306
                'process': {
307
                    'callback': '_on_cronjob',
308
                },
309
            })
310
        return True
311
312
    request_task_fields = [
313
        'taskid',
314
        'project',
315
        'url',
316
        'status',
317
        'schedule',
318
        'fetch',
319
        'process',
320
        'track',
321
        'lastcrawltime'
322
    ]
323
324
    def _check_select(self):
325
        '''Select task to fetch & process'''
326
        while self._send_buffer:
327
            _task = self._send_buffer.pop()
328
            try:
329
                # use force=False here to prevent automatic send_buffer append and get exception
330
                self.send_task(_task, False)
331
            except Queue.Full:
332
                self._send_buffer.append(_task)
333
                break
334
335
        if self.out_queue.full():
336
            return {}
337
338
        taskids = []
339
        cnt = 0
340
        cnt_dict = dict()
341
        limit = self.LOOP_LIMIT
342
        for project, task_queue in iteritems(self.task_queue):
343
            if cnt >= limit:
344
                break
345
346
            # task queue
347
            self.task_queue[project].check_update()
348
            project_cnt = 0
349
350
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
351
            while cnt < limit and project_cnt < limit / 10:
352
                taskid = task_queue.get()
353
                if not taskid:
354
                    break
355
356
                taskids.append((project, taskid))
357
                project_cnt += 1
358
                cnt += 1
359
            cnt_dict[project] = project_cnt
360
361
        for project, taskid in taskids:
362
            self._load_put_task(project, taskid)
363
364
        return cnt_dict
365
366
    def _load_put_task(self, project, taskid):
367
        task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
368
        if not task:
369
            return
370
        task = self.on_select_task(task)
371
372
    def _print_counter_log(self):
373
        # print top 5 active counters
374
        keywords = ('pending', 'success', 'retry', 'failed')
375
        total_cnt = {}
376
        project_actives = []
377
        project_fails = []
378
        for key in keywords:
379
            total_cnt[key] = 0
380
        for project, subcounter in iteritems(self._cnt['5m']):
381
            actives = 0
382
            for key in keywords:
383
                cnt = subcounter.get(key, None)
384
                if cnt:
385
                    cnt = cnt.sum
386
                    total_cnt[key] += cnt
387
                    actives += cnt
388
389
            project_actives.append((actives, project))
390
391
            fails = subcounter.get('failed', None)
392
            if fails:
393
                project_fails.append((fails.sum, project))
394
395
        top_2_fails = sorted(project_fails, reverse=True)[:2]
396
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
397
                               reverse=True)[:5 - len(top_2_fails)]
398
399
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
400
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
401
        for _, project in itertools.chain(top_3_actives, top_2_fails):
402
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
403
            log_str += " %s:%d,%d,%d,%d" % (project,
404
                                            subcounter.get('pending', 0),
405
                                            subcounter.get('success', 0),
406
                                            subcounter.get('retry', 0),
407
                                            subcounter.get('failed', 0))
408
        logger.info(log_str)
409
410
    def _dump_cnt(self):
411
        '''Dump counters to file'''
412
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
413
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
414
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
415
416
    def _try_dump_cnt(self):
417
        '''Dump counters every 60 seconds'''
418
        now = time.time()
419
        if now - self._last_dump_cnt > 60:
420
            self._last_dump_cnt = now
421
            self._dump_cnt()
422
            self._print_counter_log()
423
424
    def _check_delete(self):
425
        '''Check project delete'''
426
        now = time.time()
427
        for project in list(itervalues(self.projects)):
428
            if project['status'] != 'STOP':
429
                continue
430
            if now - project['updatetime'] < self.DELETE_TIME:
431
                continue
432
            if 'delete' not in self.projectdb.split_group(project['group']):
433
                continue
434
435
            logger.warning("deleting project: %s!", project['name'])
436
            if project['name'] in self.task_queue:
437
                self.task_queue[project['name']].rate = 0
438
                self.task_queue[project['name']].burst = 0
439
                del self.task_queue[project['name']]
440
            del self.projects[project['name']]
441
            self.taskdb.drop(project['name'])
442
            self.projectdb.drop(project['name'])
443
            if self.resultdb:
444
                self.resultdb.drop(project['name'])
445
            for each in self._cnt.values():
446
                del each[project['name']]
447
448
    def __len__(self):
449
        return sum(len(x) for x in itervalues(self.task_queue))
450
451
    def quit(self):
452
        '''Set quit signal'''
453
        self._quit = True
454
        # stop xmlrpc server
455
        if hasattr(self, 'xmlrpc_server'):
456
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
457
            self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
458
459
    def run_once(self):
460
        '''comsume queues and feed tasks to fetcher, once'''
461
462
        self._update_projects()
463
        self._check_task_done()
464
        self._check_request()
465
        while self._check_cronjob():
466
            pass
467
        self._check_select()
468
        self._check_delete()
469
        self._try_dump_cnt()
470
471
    def run(self):
472
        '''Start scheduler loop'''
473
        logger.info("loading projects")
474
475
        while not self._quit:
476
            try:
477
                time.sleep(self.LOOP_INTERVAL)
478
                self.run_once()
479
                self._exceptions = 0
480
            except KeyboardInterrupt:
481
                break
482
            except Exception as e:
483
                logger.exception(e)
484
                self._exceptions += 1
485
                if self._exceptions > self.EXCEPTION_LIMIT:
486
                    break
487
                continue
488
489
        logger.info("scheduler exiting...")
490
        self._dump_cnt()
491
492
    def trigger_on_start(self, project):
493
        '''trigger an on_start callback of project'''
494
        self.newtask_queue.put({
495
            "project": project,
496
            "taskid": "on_start",
497
            "url": "data:,on_start",
498
            "process": {
499
                "callback": "on_start",
500
            },
501
        })
502
503
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
504
        '''Start xmlrpc interface'''
505
        from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
506
507
        application = WSGIXMLRPCApplication()
508
509
        application.register_function(self.quit, '_quit')
510
        application.register_function(self.__len__, 'size')
511
512
        def dump_counter(_time, _type):
513
            try:
514
                return self._cnt[_time].to_dict(_type)
515
            except:
516
                logger.exception('')
517
        application.register_function(dump_counter, 'counter')
518
519
        def new_task(task):
520
            if self.task_verify(task):
521
                self.newtask_queue.put(task)
522
                return True
523
            return False
524
        application.register_function(new_task, 'newtask')
525
526
        def send_task(task):
527
            '''dispatch task to fetcher'''
528
            self.send_task(task)
529
            return True
530
        application.register_function(send_task, 'send_task')
531
532
        def update_project():
533
            self._force_update_project = True
534
        application.register_function(update_project, 'update_project')
535
536
        def get_active_tasks(project=None, limit=100):
537
            allowed_keys = set((
538
                'taskid',
539
                'project',
540
                'status',
541
                'url',
542
                'lastcrawltime',
543
                'updatetime',
544
                'track',
545
            ))
546
            track_allowed_keys = set((
547
                'ok',
548
                'time',
549
                'follows',
550
                'status_code',
551
            ))
552
553
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
554
                     if x and (k == project if project else True)]
555
            tasks = [next(x, None) for x in iters]
556
            result = []
557
558
            while len(result) < limit and tasks and not all(x is None for x in tasks):
559
                updatetime, task = t = max(t for t in tasks if t)
560
                i = tasks.index(t)
561
                tasks[i] = next(iters[i], None)
562
                for key in list(task):
563
                    if key == 'track':
564
                        for k in list(task[key].get('fetch', [])):
565
                            if k not in track_allowed_keys:
566
                                del task[key]['fetch'][k]
567
                        for k in list(task[key].get('process', [])):
568
                            if k not in track_allowed_keys:
569
                                del task[key]['process'][k]
570
                    if key in allowed_keys:
571
                        continue
572
                    del task[key]
573
                result.append(t)
574
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
575
            # have no idea why
576
            return json.loads(json.dumps(result))
577
        application.register_function(get_active_tasks, 'get_active_tasks')
578
579
        import tornado.wsgi
580
        import tornado.ioloop
581
        import tornado.httpserver
582
583
        container = tornado.wsgi.WSGIContainer(application)
584
        self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
585
        self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
586
        self.xmlrpc_server.listen(port=port, address=bind)
587
        self.xmlrpc_ioloop.start()
588
589
    def on_request(self, task):
590
        if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
591
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
592
            return
593
594
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
595
                                       fields=self.merge_task_fields)
596
        if oldtask:
597
            return self.on_old_request(task, oldtask)
598
        else:
599
            return self.on_new_request(task)
600
601
    def on_new_request(self, task):
602
        '''Called when a new request is arrived'''
603
        task['status'] = self.taskdb.ACTIVE
604
        self.insert_task(task)
605
        self.put_task(task)
606
607
        project = task['project']
608
        self._cnt['5m'].event((project, 'pending'), +1)
609
        self._cnt['1h'].event((project, 'pending'), +1)
610
        self._cnt['1d'].event((project, 'pending'), +1)
611
        self._cnt['all'].event((project, 'pending'), +1)
612
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
613
        return task
614
615
    def on_old_request(self, task, old_task):
616
        '''Called when a crawled task is arrived'''
617
        now = time.time()
618
619
        _schedule = task.get('schedule', self.default_schedule)
620
        old_schedule = old_task.get('schedule', {})
621
622
        restart = False
623
        schedule_age = _schedule.get('age', self.default_schedule['age'])
624
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
625
            restart = True
626
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
627
            restart = True
628
        elif _schedule.get('force_update'):
629
            restart = True
630
631
        if not restart:
632
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
633
            return
634
635
        task['status'] = self.taskdb.ACTIVE
636
        self.update_task(task)
637
        self.put_task(task)
638
639
        project = task['project']
640
        if old_task['status'] != self.taskdb.ACTIVE:
641
            self._cnt['5m'].event((project, 'pending'), +1)
642
            self._cnt['1h'].event((project, 'pending'), +1)
643
            self._cnt['1d'].event((project, 'pending'), +1)
644
        if old_task['status'] == self.taskdb.SUCCESS:
645
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
646
        elif old_task['status'] == self.taskdb.FAILED:
647
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
648
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
649
        return task
650
651
    def on_task_status(self, task):
652
        '''Called when a status pack is arrived'''
653
        try:
654
            procesok = task['track']['process']['ok']
655
            if not self.task_queue[task['project']].done(task['taskid']):
656
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
657
                return None
658
        except KeyError as e:
659
            logger.error("Bad status pack: %s", e)
660
            return None
661
662
        if procesok:
663
            ret = self.on_task_done(task)
664
        else:
665
            ret = self.on_task_failed(task)
666
667
        if task['track']['fetch'].get('time'):
668
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
669
                                       task['track']['fetch']['time'])
670
        if task['track']['process'].get('time'):
671
            self._cnt['5m_time'].event((task['project'], 'process_time'),
672
                                       task['track']['process'].get('time'))
673
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
674
        return ret
675
676
    def on_task_done(self, task):
677
        '''Called when a task is done and success, called by `on_task_status`'''
678
        task['status'] = self.taskdb.SUCCESS
679
        task['lastcrawltime'] = time.time()
680
681
        if 'schedule' in task:
682
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
683
                task['status'] = self.taskdb.ACTIVE
684
                next_exetime = task['schedule'].get('age')
685
                task['schedule']['exetime'] = time.time() + next_exetime
686
                self.put_task(task)
687
            else:
688
                del task['schedule']
689
        self.update_task(task)
690
691
        project = task['project']
692
        self._cnt['5m'].event((project, 'success'), +1)
693
        self._cnt['1h'].event((project, 'success'), +1)
694
        self._cnt['1d'].event((project, 'success'), +1)
695
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
696
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
697
        return task
698
699
    def on_task_failed(self, task):
700
        '''Called when a task is failed, called by `on_task_status`'''
701
702
        if 'schedule' not in task:
703
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
704
            if old_task is None:
705
                logging.error('unknown status pack: %s' % task)
706
                return
707
            task['schedule'] = old_task.get('schedule', {})
708
709
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
710
        retried = task['schedule'].get('retried', 0)
711
712
        project_info = self.projects.get(task['project'], {})
713
        retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY
714
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
715
716
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
717
            next_exetime = min(next_exetime, task['schedule'].get('age'))
718
        else:
719
            if retried >= retries:
720
                next_exetime = -1
721
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
722
                next_exetime = task['schedule'].get('age')
723
724
        if next_exetime < 0:
725
            task['status'] = self.taskdb.FAILED
726
            task['lastcrawltime'] = time.time()
727
            self.update_task(task)
728
729
            project = task['project']
730
            self._cnt['5m'].event((project, 'failed'), +1)
731
            self._cnt['1h'].event((project, 'failed'), +1)
732
            self._cnt['1d'].event((project, 'failed'), +1)
733
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
734
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
735
            return task
736
        else:
737
            task['schedule']['retried'] = retried + 1
738
            task['schedule']['exetime'] = time.time() + next_exetime
739
            task['lastcrawltime'] = time.time()
740
            self.update_task(task)
741
            self.put_task(task)
742
743
            project = task['project']
744
            self._cnt['5m'].event((project, 'retry'), +1)
745
            self._cnt['1h'].event((project, 'retry'), +1)
746
            self._cnt['1d'].event((project, 'retry'), +1)
747
            # self._cnt['all'].event((project, 'retry'), +1)
748
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
749
                retried, retries), task)
750
            return task
751
752
    def on_select_task(self, task):
753
        '''Called when a task is selected to fetch & process'''
754
        # inject informations about project
755
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
756
757
        project_info = self.projects.get(task['project'])
758
        assert project_info, 'no such project'
759
        task['group'] = project_info.get('group')
760
        task['project_md5sum'] = project_info.get('md5sum')
761
        task['project_updatetime'] = project_info.get('updatetime', 0)
762
        project_info['active_tasks'].appendleft((time.time(), task))
763
        self.send_task(task)
764
        return task
765
766
767
from tornado import gen
768
769
770
class OneScheduler(Scheduler):
771
    """
772
    Scheduler Mixin class for one mode
773
774
    overwirted send_task method
775
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
776
    """
777
778
    def _check_select(self):
779
        """
780
        interactive mode of select tasks
781
        """
782
        if not self.interactive:
783
            return super(OneScheduler, self)._check_select()
784
785
        # waiting for running tasks
786
        if self.running_task > 0:
787
            return
788
789
        is_crawled = []
790
791
        def run(project=None):
792
            return crawl('on_start', project=project)
793
794
        def crawl(url, project=None, **kwargs):
795
            """
796
            Crawl given url, same parameters as BaseHandler.crawl
797
798
            url - url or taskid, parameters will be used if in taskdb
799
            project - can be ignored if only one project exists.
800
            """
801
802
            # looking up the project instance
803
            if project is None:
804
                if len(self.projects) == 1:
805
                    project = list(self.projects.keys())[0]
806
                else:
807
                    raise LookupError('You need specify the project: %r'
808
                                      % list(self.projects.keys()))
809
            project_data = self.processor.project_manager.get(project)
810
            if not project_data:
811
                raise LookupError('no such project: %s' % project)
812
813
            # get task package
814
            instance = project_data['instance']
815
            instance._reset()
816
            task = instance.crawl(url, **kwargs)
817
            if isinstance(task, list):
818
                raise Exception('url list is not allowed in interactive mode')
819
820
            # check task in taskdb
821
            if not kwargs:
822
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
823
                                              fields=self.request_task_fields)
824
                if not dbtask:
825
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
826
                                                  fields=self.request_task_fields)
827
                if dbtask:
828
                    task = dbtask
829
830
            # select the task
831
            self.on_select_task(task)
832
            is_crawled.append(True)
833
834
            shell.ask_exit()
835
836
        def quit_interactive():
837
            '''Quit interactive mode'''
838
            is_crawled.append(True)
839
            self.interactive = False
840
            shell.ask_exit()
841
842
        def quit_pyspider():
843
            '''Close pyspider'''
844
            is_crawled[:] = []
845
            shell.ask_exit()
846
847
        shell = utils.get_python_console()
848
        shell.interact(
849
            'pyspider shell - Select task\n'
850
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
851
            'quit_interactive() - Quit interactive mode\n'
852
            'quit_pyspider() - Close pyspider'
853
        )
854
        if not is_crawled:
855
            self.ioloop.add_callback(self.ioloop.stop)
856
857
    def __getattr__(self, name):
858
        """patch for crawl(url, callback=self.index_page) API"""
859
        if self.interactive:
860
            return name
861
        raise AttributeError(name)
862
863
    def on_task_status(self, task):
864
        """Ignore not processing error in interactive mode"""
865
        if not self.interactive:
866
            super(OneScheduler, self).on_task_status(task)
867
868
        try:
869
            procesok = task['track']['process']['ok']
870
        except KeyError as e:
871
            logger.error("Bad status pack: %s", e)
872
            return None
873
874
        if procesok:
875
            ret = self.on_task_done(task)
876
        else:
877
            ret = self.on_task_failed(task)
878
        if task['track']['fetch'].get('time'):
879
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
880
                                       task['track']['fetch']['time'])
881
        if task['track']['process'].get('time'):
882
            self._cnt['5m_time'].event((task['project'], 'process_time'),
883
                                       task['track']['process'].get('time'))
884
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
885
        return ret
886
887
    def init_one(self, ioloop, fetcher, processor,
888
                 result_worker=None, interactive=False):
889
        self.ioloop = ioloop
890
        self.fetcher = fetcher
891
        self.processor = processor
892
        self.result_worker = result_worker
893
        self.interactive = interactive
894
        self.running_task = 0
895
896
    @gen.coroutine
897
    def do_task(self, task):
898
        self.running_task += 1
899
        result = yield gen.Task(self.fetcher.fetch, task)
900
        type, task, response = result.args
901
        self.processor.on_task(task, response)
902
        # do with message
903
        while not self.processor.inqueue.empty():
904
            _task, _response = self.processor.inqueue.get()
905
            self.processor.on_task(_task, _response)
906
        # do with results
907
        while not self.processor.result_queue.empty():
908
            _task, _result = self.processor.result_queue.get()
909
            if self.result_worker:
910
                self.result_worker.on_result(_task, _result)
911
        self.running_task -= 1
912
913
    def send_task(self, task, force=True):
914
        if self.fetcher.http_client.free_size() <= 0:
915
            if force:
916
                self._send_buffer.appendleft(task)
917
            else:
918
                raise self.outqueue.Full
919
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
920
921
    def run(self):
922
        import tornado.ioloop
923
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
924
                                        io_loop=self.ioloop).start()
925
        self.ioloop.start()
926
927
    def quit(self):
928
        self.ioloop.stop()
929
        logger.info("scheduler exiting...")
930
931
932
import random
933
import threading
934
935
936
class ThreadBaseScheduler(Scheduler):
937
    def __init__(self, threads=4, *args, **kwargs):
938
        self.threads = threads
939
        self.local = threading.local()
940
941
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
942
943
        self._taskdb = self.taskdb
944
        self._projectdb = self.projectdb
945
        self._resultdb = self.resultdb
946
947
        self.thread_objs = []
948
        self.thread_queues = []
949
        self._start_threads()
950
        assert len(self.thread_queues) > 0
951
952
    @property
953
    def taskdb(self):
954
        if not hasattr(self.local, 'taskdb'):
955
            self.taskdb = self._taskdb.copy()
956
        return self.local.taskdb
957
958
    @taskdb.setter
959
    def taskdb(self, taskdb):
960
        self.local.taskdb = taskdb
961
962
    @property
963
    def projectdb(self):
964
        if not hasattr(self.local, 'projectdb'):
965
            self.projectdb = self._projectdb.copy()
966
        return self.local.projectdb
967
968
    @projectdb.setter
969
    def projectdb(self, projectdb):
970
        self.local.projectdb = projectdb
971
972
    @property
973
    def resultdb(self):
974
        if not hasattr(self.local, 'resultdb'):
975
            self.resultdb = self._resultdb.copy()
976
        return self.local.resultdb
977
978
    @resultdb.setter
979
    def resultdb(self, resultdb):
980
        self.local.resultdb = resultdb
981
982
    def _start_threads(self):
983
        for i in range(self.threads):
984
            queue = Queue.Queue()
985
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
986
            thread.daemon = True
987
            thread.start()
988
            self.thread_objs.append(thread)
989
            self.thread_queues.append(queue)
990
991
    def _thread_worker(self, queue):
992
        while True:
993
            method, args, kwargs = queue.get()
994
            try:
995
                method(*args, **kwargs)
996
            except Exception as e:
997
                logger.exception(e)
998
999
    def _run_in_thread(self, method, *args, **kwargs):
1000
        i = kwargs.pop('_i', None)
1001
        block = kwargs.pop('_block', False)
1002
1003
        if i is None:
1004
            while True:
1005
                for queue in self.thread_queues:
1006
                    if queue.empty():
1007
                        break
1008
                else:
1009
                    if block:
1010
                        time.sleep(0.1)
1011
                        continue
1012
                    else:
1013
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1014
                break
1015
        else:
1016
            queue = self.thread_queues[i % len(self.thread_queues)]
1017
1018
        queue.put((method, args, kwargs))
1019
1020
        if block:
1021
            self._wait_thread()
1022
1023
    def _wait_thread(self):
1024
        while True:
1025
            if all(queue.empty() for queue in self.thread_queues):
1026
                break
1027
            time.sleep(0.1)
1028
1029
    def _update_project(self, project):
1030
        self._run_in_thread(Scheduler._update_project, self, project)
1031
1032
    def on_task_status(self, task):
1033
        i = hash(task['taskid'])
1034
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1035
1036
    def on_request(self, task):
1037
        i = hash(task['taskid'])
1038
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1039
1040
    def _load_put_task(self, project, taskid):
1041
        i = hash(taskid)
1042
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1043
1044
    def run_once(self):
1045
        super(ThreadBaseScheduler, self).run_once()
1046
        self._wait_thread()
1047