Completed
Push — master ( b244b2...c54284 )
by Roy
59s
created

_load_put_task()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2014-02-07 17:05:11
7
8
9
import os
10
import json
11
import time
12
import logging
13
import itertools
14
from collections import deque
15
16
from six import iteritems, itervalues
17
18
from pyspider.libs import counter, utils
19
from six.moves import queue as Queue
20
from .task_queue import TaskQueue
21
logger = logging.getLogger('scheduler')
22
23
24
class Scheduler(object):
25
    UPDATE_PROJECT_INTERVAL = 5 * 60
26
    default_schedule = {
27
        'priority': 0,
28
        'retries': 3,
29
        'exetime': 0,
30
        'age': -1,
31
        'itag': None,
32
    }
33
    LOOP_LIMIT = 1000
34
    LOOP_INTERVAL = 0.1
35
    ACTIVE_TASKS = 100
36
    INQUEUE_LIMIT = 0
37
    EXCEPTION_LIMIT = 3
38
    DELETE_TIME = 24 * 60 * 60
39
    DEFAULT_RETRY_DELAY = {
40
        0: 30,
41
        1: 1*60*60,
42
        2: 6*60*60,
43
        3: 12*60*60,
44
        '': 24*60*60
45
    }
46
47
    def __init__(self, taskdb, projectdb, newtask_queue, status_queue,
48
                 out_queue, data_path='./data', resultdb=None):
49
        self.taskdb = taskdb
50
        self.projectdb = projectdb
51
        self.resultdb = resultdb
52
        self.newtask_queue = newtask_queue
53
        self.status_queue = status_queue
54
        self.out_queue = out_queue
55
        self.data_path = data_path
56
57
        self._send_buffer = deque()
58
        self._quit = False
59
        self._exceptions = 0
60
        self.projects = dict()
61
        self._force_update_project = False
62
        self._last_update_project = 0
63
        self.task_queue = dict()
64
        self._last_tick = int(time.time())
65
66
        self._cnt = {
67
            "5m_time": counter.CounterManager(
68
                lambda: counter.TimebaseAverageEventCounter(30, 10)),
69
            "5m": counter.CounterManager(
70
                lambda: counter.TimebaseAverageWindowCounter(30, 10)),
71
            "1h": counter.CounterManager(
72
                lambda: counter.TimebaseAverageWindowCounter(60, 60)),
73
            "1d": counter.CounterManager(
74
                lambda: counter.TimebaseAverageWindowCounter(10 * 60, 24 * 6)),
75
            "all": counter.CounterManager(
76
                lambda: counter.TotalCounter()),
77
        }
78
        self._cnt['1h'].load(os.path.join(self.data_path, 'scheduler.1h'))
79
        self._cnt['1d'].load(os.path.join(self.data_path, 'scheduler.1d'))
80
        self._cnt['all'].load(os.path.join(self.data_path, 'scheduler.all'))
81
        self._last_dump_cnt = 0
82
83
    def _update_projects(self):
84
        '''Check project update'''
85
        now = time.time()
86
        if (
87
                not self._force_update_project
88
                and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
89
        ):
90
            return
91
        for project in self.projectdb.check_update(self._last_update_project):
92
            self._update_project(project)
93
            logger.debug("project: %s updated.", project['name'])
94
        self._force_update_project = False
95
        self._last_update_project = now
96
97
    def _update_project(self, project):
98
        '''update one project'''
99
        if project['name'] not in self.projects:
100
            self.projects[project['name']] = {}
101
        self.projects[project['name']].update(project)
102
        self.projects[project['name']]['md5sum'] = utils.md5string(project['script'])
103
        if not self.projects[project['name']].get('active_tasks', None):
104
            self.projects[project['name']]['active_tasks'] = deque(maxlen=self.ACTIVE_TASKS)
105
106
        # load task queue when project is running and delete task_queue when project is stoped
107
        if project['status'] in ('RUNNING', 'DEBUG'):
108
            if project['name'] not in self.task_queue:
109
                self._load_tasks(project['name'])
110
            self.task_queue[project['name']].rate = project['rate']
111
            self.task_queue[project['name']].burst = project['burst']
112
113
            # update project runtime info from processor by sending a _on_get_info
114
            # request, result is in status_page.track.save
115
            self.on_select_task({
116
                'taskid': '_on_get_info',
117
                'project': project['name'],
118
                'url': 'data:,_on_get_info',
119
                'status': self.taskdb.SUCCESS,
120
                'fetch': {
121
                    'save': ['min_tick', 'retry_delay'],
122
                },
123
                'process': {
124
                    'callback': '_on_get_info',
125
                },
126
            })
127
        else:
128
            if project['name'] in self.task_queue:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
129
                self.task_queue[project['name']].rate = 0
130
                self.task_queue[project['name']].burst = 0
131
                del self.task_queue[project['name']]
132
133
            if project not in self._cnt['all']:
134
                self._update_project_cnt(project['name'])
135
136
    scheduler_task_fields = ['taskid', 'project', 'schedule', ]
137
138
    def _load_tasks(self, project):
139
        '''load tasks from database'''
140
        self.task_queue[project] = TaskQueue(rate=0, burst=0)
141
        for task in self.taskdb.load_tasks(
142
                self.taskdb.ACTIVE, project, self.scheduler_task_fields
143
        ):
144
            taskid = task['taskid']
145
            _schedule = task.get('schedule', self.default_schedule)
146
            priority = _schedule.get('priority', self.default_schedule['priority'])
147
            exetime = _schedule.get('exetime', self.default_schedule['exetime'])
148
            self.task_queue[project].put(taskid, priority, exetime)
149
        logger.debug('project: %s loaded %d tasks.', project, len(self.task_queue[project]))
150
151
        if self.projects[project]['status'] in ('RUNNING', 'DEBUG'):
152
            self.task_queue[project].rate = self.projects[project]['rate']
153
            self.task_queue[project].burst = self.projects[project]['burst']
154
        else:
155
            self.task_queue[project].rate = 0
156
            self.task_queue[project].burst = 0
157
158
        if project not in self._cnt['all']:
159
            self._update_project_cnt(project)
160
        self._cnt['all'].value((project, 'pending'), len(self.task_queue[project]))
161
162
    def _update_project_cnt(self, project):
163
        status_count = self.taskdb.status_count(project)
164
        self._cnt['all'].value(
165
            (project, 'success'),
166
            status_count.get(self.taskdb.SUCCESS, 0)
167
        )
168
        self._cnt['all'].value(
169
            (project, 'failed'),
170
            status_count.get(self.taskdb.FAILED, 0) + status_count.get(self.taskdb.BAD, 0)
171
        )
172
        self._cnt['all'].value(
173
            (project, 'pending'),
174
            status_count.get(self.taskdb.ACTIVE, 0)
175
        )
176
177
    def task_verify(self, task):
178
        '''
179
        return False if any of 'taskid', 'project', 'url' is not in task dict
180
                        or project in not in task_queue
181
        '''
182
        for each in ('taskid', 'project', 'url', ):
183
            if each not in task or not task[each]:
184
                logger.error('%s not in task: %.200r', each, task)
185
                return False
186
        if task['project'] not in self.task_queue:
187
            logger.error('unknown project: %s', task['project'])
188
            return False
189
        return True
190
191
    def insert_task(self, task):
192
        '''insert task into database'''
193
        return self.taskdb.insert(task['project'], task['taskid'], task)
194
195
    def update_task(self, task):
196
        '''update task in database'''
197
        return self.taskdb.update(task['project'], task['taskid'], task)
198
199
    def put_task(self, task):
200
        '''put task to task queue'''
201
        _schedule = task.get('schedule', self.default_schedule)
202
        self.task_queue[task['project']].put(
203
            task['taskid'],
204
            priority=_schedule.get('priority', self.default_schedule['priority']),
205
            exetime=_schedule.get('exetime', self.default_schedule['exetime'])
206
        )
207
208
    def send_task(self, task, force=True):
209
        '''
210
        dispatch task to fetcher
211
212
        out queue may have size limit to prevent block, a send_buffer is used
213
        '''
214
        try:
215
            self.out_queue.put_nowait(task)
216
        except Queue.Full:
217
            if force:
218
                self._send_buffer.appendleft(task)
219
            else:
220
                raise
221
222
    def _check_task_done(self):
223
        '''Check status queue'''
224
        cnt = 0
225
        try:
226
            while True:
227
                task = self.status_queue.get_nowait()
228
                # check _on_get_info result here
229
                if task.get('taskid') == '_on_get_info' and 'project' in task and 'track' in task:
230
                    self.projects[task['project']].update(task['track'].get('save') or {})
231
                    logger.info(
232
                        '%s on_get_info %r', task['project'], task['track'].get('save', {})
233
                    )
234
                    continue
235
                elif not self.task_verify(task):
236
                    continue
237
                self.on_task_status(task)
238
                cnt += 1
239
        except Queue.Empty:
240
            pass
241
        return cnt
242
243
    merge_task_fields = ['taskid', 'project', 'url', 'status', 'schedule', 'lastcrawltime']
244
245
    def _check_request(self):
246
        '''Check new task queue'''
247
        tasks = {}
248
        while len(tasks) < self.LOOP_LIMIT:
249
            try:
250
                task = self.newtask_queue.get_nowait()
251
            except Queue.Empty:
252
                break
253
254
            if isinstance(task, list):
255
                _tasks = task
256
            else:
257
                _tasks = (task, )
258
259
            for task in _tasks:
260
                if not self.task_verify(task):
261
                    continue
262
263
                if task['taskid'] in self.task_queue[task['project']]:
264
                    if not task.get('schedule', {}).get('force_update', False):
265
                        logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
266
                        continue
267
268
                if task['taskid'] in tasks:
269
                    if not task.get('schedule', {}).get('force_update', False):
270
                        continue
271
272
                tasks[task['taskid']] = task
273
274
        for task in itervalues(tasks):
275
            self.on_request(task)
276
277
        return len(tasks)
278
279
    def _check_cronjob(self):
280
        """Check projects cronjob tick, return True when a new tick is sended"""
281
        now = time.time()
282
        self._last_tick = int(self._last_tick)
283
        if now - self._last_tick < 1:
284
            return False
285
        self._last_tick += 1
286
        for project in itervalues(self.projects):
287
            if project['status'] not in ('DEBUG', 'RUNNING'):
288
                continue
289
            if project.get('min_tick', 0) == 0:
290
                continue
291
            if self._last_tick % int(project['min_tick']) != 0:
292
                continue
293
            self.on_select_task({
294
                'taskid': '_on_cronjob',
295
                'project': project['name'],
296
                'url': 'data:,_on_cronjob',
297
                'status': self.taskdb.SUCCESS,
298
                'fetch': {
299
                    'save': {
300
                        'tick': self._last_tick,
301
                    },
302
                },
303
                'process': {
304
                    'callback': '_on_cronjob',
305
                },
306
            })
307
        return True
308
309
    request_task_fields = [
310
        'taskid',
311
        'project',
312
        'url',
313
        'status',
314
        'schedule',
315
        'fetch',
316
        'process',
317
        'track',
318
        'lastcrawltime'
319
    ]
320
321
    def _check_select(self):
322
        '''Select task to fetch & process'''
323
        while self._send_buffer:
324
            _task = self._send_buffer.pop()
325
            try:
326
                # use force=False here to prevent automatic send_buffer append and get exception
327
                self.send_task(_task, False)
328
            except Queue.Full:
329
                self._send_buffer.append(_task)
330
                break
331
332
        if self.out_queue.full():
333
            return {}
334
335
        taskids = []
336
        cnt = 0
337
        cnt_dict = dict()
338
        limit = self.LOOP_LIMIT
339
        for project, task_queue in iteritems(self.task_queue):
340
            if cnt >= limit:
341
                break
342
343
            # task queue
344
            self.task_queue[project].check_update()
345
            project_cnt = 0
346
347
            # check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
348
            while cnt < limit and project_cnt < limit / 10:
349
                taskid = task_queue.get()
350
                if not taskid:
351
                    break
352
353
                taskids.append((project, taskid))
354
                project_cnt += 1
355
                cnt += 1
356
            cnt_dict[project] = project_cnt
357
358
        for project, taskid in taskids:
359
            self._load_put_task(project, taskid)
360
361
        return cnt_dict
362
363
    def _load_put_task(self, project, taskid):
364
        task = self.taskdb.get_task(project, taskid, fields=self.request_task_fields)
365
        if not task:
366
            return
367
        task = self.on_select_task(task)
368
369
    def _print_counter_log(self):
370
        # print top 5 active counters
371
        keywords = ('pending', 'success', 'retry', 'failed')
372
        total_cnt = {}
373
        project_actives = []
374
        project_fails = []
375
        for key in keywords:
376
            total_cnt[key] = 0
377
        for project, subcounter in iteritems(self._cnt['5m']):
378
            actives = 0
379
            for key in keywords:
380
                cnt = subcounter.get(key, None)
381
                if cnt:
382
                    cnt = cnt.sum
383
                    total_cnt[key] += cnt
384
                    actives += cnt
385
386
            project_actives.append((actives, project))
387
388
            fails = subcounter.get('failed', None)
389
            if fails:
390
                project_fails.append((fails.sum, project))
391
392
        top_2_fails = sorted(project_fails, reverse=True)[:2]
393
        top_3_actives = sorted([x for x in project_actives if x[1] not in top_2_fails],
394
                               reverse=True)[:5 - len(top_2_fails)]
395
396
        log_str = ("in 5m: new:%(pending)d,success:%(success)d,"
397
                   "retry:%(retry)d,failed:%(failed)d" % total_cnt)
398
        for _, project in itertools.chain(top_3_actives, top_2_fails):
399
            subcounter = self._cnt['5m'][project].to_dict(get_value='sum')
400
            log_str += " %s:%d,%d,%d,%d" % (project,
401
                                            subcounter.get('pending', 0),
402
                                            subcounter.get('success', 0),
403
                                            subcounter.get('retry', 0),
404
                                            subcounter.get('failed', 0))
405
        logger.info(log_str)
406
407
    def _dump_cnt(self):
408
        '''Dump counters to file'''
409
        self._cnt['1h'].dump(os.path.join(self.data_path, 'scheduler.1h'))
410
        self._cnt['1d'].dump(os.path.join(self.data_path, 'scheduler.1d'))
411
        self._cnt['all'].dump(os.path.join(self.data_path, 'scheduler.all'))
412
413
    def _try_dump_cnt(self):
414
        '''Dump counters every 60 seconds'''
415
        now = time.time()
416
        if now - self._last_dump_cnt > 60:
417
            self._last_dump_cnt = now
418
            self._dump_cnt()
419
            self._print_counter_log()
420
421
    def _check_delete(self):
422
        '''Check project delete'''
423
        now = time.time()
424
        for project in list(itervalues(self.projects)):
425
            if project['status'] != 'STOP':
426
                continue
427
            if now - project['updatetime'] < self.DELETE_TIME:
428
                continue
429
            if 'delete' not in self.projectdb.split_group(project['group']):
430
                continue
431
432
            logger.warning("deleting project: %s!", project['name'])
433
            if project['name'] in self.task_queue:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
434
                self.task_queue[project['name']].rate = 0
435
                self.task_queue[project['name']].burst = 0
436
                del self.task_queue[project['name']]
437
            del self.projects[project['name']]
438
            self.taskdb.drop(project['name'])
439
            self.projectdb.drop(project['name'])
440
            if self.resultdb:
441
                self.resultdb.drop(project['name'])
442
443
    def __len__(self):
444
        return sum(len(x) for x in itervalues(self.task_queue))
445
446
    def quit(self):
447
        '''Set quit signal'''
448
        self._quit = True
449
450
    def run_once(self):
451
        '''comsume queues and feed tasks to fetcher, once'''
452
453
        self._update_projects()
454
        self._check_task_done()
455
        self._check_request()
456
        while self._check_cronjob():
457
            pass
458
        self._check_select()
459
        self._check_delete()
460
        self._try_dump_cnt()
461
462
    def run(self):
463
        '''Start scheduler loop'''
464
        logger.info("loading projects")
465
466
        while not self._quit:
467
            try:
468
                time.sleep(self.LOOP_INTERVAL)
469
                self.run_once()
470
                self._exceptions = 0
471
            except KeyboardInterrupt:
472
                break
473
            except Exception as e:
474
                logger.exception(e)
475
                self._exceptions += 1
476
                if self._exceptions > self.EXCEPTION_LIMIT:
477
                    break
478
                continue
479
480
        logger.info("scheduler exiting...")
481
        self._dump_cnt()
482
483
    def trigger_on_start(self, project):
484
        '''trigger an on_start callback of project'''
485
        self.newtask_queue.put({
486
            "project": project,
487
            "taskid": "on_start",
488
            "url": "data:,on_start",
489
            "process": {
490
                "callback": "on_start",
491
            },
492
        })
493
494
    def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
495
        '''Start xmlrpc interface'''
496
        try:
497
            from six.moves.xmlrpc_server import SimpleXMLRPCServer
498
        except ImportError:
499
            from SimpleXMLRPCServer import SimpleXMLRPCServer
500
501
        server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
502
        server.register_introspection_functions()
503
        server.register_multicall_functions()
504
505
        server.register_function(self.quit, '_quit')
506
        server.register_function(self.__len__, 'size')
507
508
        def dump_counter(_time, _type):
509
            try:
510
                return self._cnt[_time].to_dict(_type)
511
            except:
512
                logger.exception('')
513
        server.register_function(dump_counter, 'counter')
514
515
        def new_task(task):
516
            if self.task_verify(task):
517
                self.newtask_queue.put(task)
518
                return True
519
            return False
520
        server.register_function(new_task, 'newtask')
521
522
        def send_task(task):
523
            '''dispatch task to fetcher'''
524
            self.send_task(task)
525
            return True
526
        server.register_function(send_task, 'send_task')
527
528
        def update_project():
529
            self._force_update_project = True
530
        server.register_function(update_project, 'update_project')
531
532
        def get_active_tasks(project=None, limit=100):
533
            allowed_keys = set((
534
                'taskid',
535
                'project',
536
                'status',
537
                'url',
538
                'lastcrawltime',
539
                'updatetime',
540
                'track',
541
            ))
542
            track_allowed_keys = set((
543
                'ok',
544
                'time',
545
                'follows',
546
                'status_code',
547
            ))
548
549
            iters = [iter(x['active_tasks']) for k, x in iteritems(self.projects)
550
                     if x and (k == project if project else True)]
551
            tasks = [next(x, None) for x in iters]
552
            result = []
553
554
            while len(result) < limit and tasks and not all(x is None for x in tasks):
555
                updatetime, task = t = max(t for t in tasks if t)
556
                i = tasks.index(t)
557
                tasks[i] = next(iters[i], None)
558
                for key in list(task):
559
                    if key == 'track':
560
                        for k in list(task[key].get('fetch', [])):
561
                            if k not in track_allowed_keys:
562
                                del task[key]['fetch'][k]
563
                        for k in list(task[key].get('process', [])):
564
                            if k not in track_allowed_keys:
565
                                del task[key]['process'][k]
566
                    if key in allowed_keys:
567
                        continue
568
                    del task[key]
569
                result.append(t)
570
            # fix for "<type 'exceptions.TypeError'>:dictionary key must be string"
571
            # have no idea why
572
            return json.loads(json.dumps(result))
573
        server.register_function(get_active_tasks, 'get_active_tasks')
574
575
        server.timeout = 0.5
576
        while not self._quit:
577
            server.handle_request()
578
        server.server_close()
579
580
    def on_request(self, task):
581
        if self.INQUEUE_LIMIT and len(self.task_queue[task['project']]) >= self.INQUEUE_LIMIT:
582
            logger.debug('overflow task %(project)s:%(taskid)s %(url)s', task)
583
            return
584
585
        oldtask = self.taskdb.get_task(task['project'], task['taskid'],
586
                                       fields=self.merge_task_fields)
587
        if oldtask:
588
            return self.on_old_request(task, oldtask)
589
        else:
590
            return self.on_new_request(task)
591
592
    def on_new_request(self, task):
593
        '''Called when a new request is arrived'''
594
        task['status'] = self.taskdb.ACTIVE
595
        self.insert_task(task)
596
        self.put_task(task)
597
598
        project = task['project']
599
        self._cnt['5m'].event((project, 'pending'), +1)
600
        self._cnt['1h'].event((project, 'pending'), +1)
601
        self._cnt['1d'].event((project, 'pending'), +1)
602
        self._cnt['all'].event((project, 'pending'), +1)
603
        logger.info('new task %(project)s:%(taskid)s %(url)s', task)
604
        return task
605
606
    def on_old_request(self, task, old_task):
607
        '''Called when a crawled task is arrived'''
608
        now = time.time()
609
610
        _schedule = task.get('schedule', self.default_schedule)
611
        old_schedule = old_task.get('schedule', {})
612
613
        restart = False
614
        schedule_age = _schedule.get('age', self.default_schedule['age'])
615
        if _schedule.get('itag') and _schedule['itag'] != old_schedule.get('itag'):
616
            restart = True
617
        elif schedule_age >= 0 and schedule_age + (old_task.get('lastcrawltime', 0) or 0) < now:
618
            restart = True
619
        elif _schedule.get('force_update'):
620
            restart = True
621
622
        if not restart:
623
            logger.debug('ignore newtask %(project)s:%(taskid)s %(url)s', task)
624
            return
625
626
        task['status'] = self.taskdb.ACTIVE
627
        self.update_task(task)
628
        self.put_task(task)
629
630
        project = task['project']
631
        if old_task['status'] != self.taskdb.ACTIVE:
632
            self._cnt['5m'].event((project, 'pending'), +1)
633
            self._cnt['1h'].event((project, 'pending'), +1)
634
            self._cnt['1d'].event((project, 'pending'), +1)
635
        if old_task['status'] == self.taskdb.SUCCESS:
636
            self._cnt['all'].event((project, 'success'), -1).event((project, 'pending'), +1)
637
        elif old_task['status'] == self.taskdb.FAILED:
638
            self._cnt['all'].event((project, 'failed'), -1).event((project, 'pending'), +1)
639
        logger.info('restart task %(project)s:%(taskid)s %(url)s', task)
640
        return task
641
642
    def on_task_status(self, task):
643
        '''Called when a status pack is arrived'''
644
        try:
645
            procesok = task['track']['process']['ok']
646
            if not self.task_queue[task['project']].done(task['taskid']):
647
                logging.error('not processing pack: %(project)s:%(taskid)s %(url)s', task)
648
                return None
649
        except KeyError as e:
650
            logger.error("Bad status pack: %s", e)
651
            return None
652
653
        if procesok:
654
            ret = self.on_task_done(task)
655
        else:
656
            ret = self.on_task_failed(task)
657
658
        if task['track']['fetch'].get('time'):
659
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
660
                                       task['track']['fetch']['time'])
661
        if task['track']['process'].get('time'):
662
            self._cnt['5m_time'].event((task['project'], 'process_time'),
663
                                       task['track']['process'].get('time'))
664
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
665
        return ret
666
667
    def on_task_done(self, task):
668
        '''Called when a task is done and success, called by `on_task_status`'''
669
        task['status'] = self.taskdb.SUCCESS
670
        task['lastcrawltime'] = time.time()
671
672
        if 'schedule' in task:
673
            if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
674
                task['status'] = self.taskdb.ACTIVE
675
                next_exetime = task['schedule'].get('age')
676
                task['schedule']['exetime'] = time.time() + next_exetime
677
                self.put_task(task)
678
            else:
679
                del task['schedule']
680
        self.update_task(task)
681
682
        project = task['project']
683
        self._cnt['5m'].event((project, 'success'), +1)
684
        self._cnt['1h'].event((project, 'success'), +1)
685
        self._cnt['1d'].event((project, 'success'), +1)
686
        self._cnt['all'].event((project, 'success'), +1).event((project, 'pending'), -1)
687
        logger.info('task done %(project)s:%(taskid)s %(url)s', task)
688
        return task
689
690
    def on_task_failed(self, task):
691
        '''Called when a task is failed, called by `on_task_status`'''
692
693
        if 'schedule' not in task:
694
            old_task = self.taskdb.get_task(task['project'], task['taskid'], fields=['schedule'])
695
            if old_task is None:
696
                logging.error('unknown status pack: %s' % task)
697
                return
698
            task['schedule'] = old_task.get('schedule', {})
699
700
        retries = task['schedule'].get('retries', self.default_schedule['retries'])
701
        retried = task['schedule'].get('retried', 0)
702
703
        project_info = self.projects.get(task['project'], {})
704
        retry_delay = project_info.get('retry_delay', None) or self.DEFAULT_RETRY_DELAY
705
        next_exetime = retry_delay.get(retried, retry_delay.get('', self.DEFAULT_RETRY_DELAY['']))
706
707
        if task['schedule'].get('auto_recrawl') and 'age' in task['schedule']:
708
            next_exetime = min(next_exetime, task['schedule'].get('age'))
709
        else:
710
            if retried >= retries:
711
                next_exetime = -1
712
            elif 'age' in task['schedule'] and next_exetime > task['schedule'].get('age'):
713
                next_exetime = task['schedule'].get('age')
714
715
        if next_exetime < 0:
716
            task['status'] = self.taskdb.FAILED
717
            task['lastcrawltime'] = time.time()
718
            self.update_task(task)
719
720
            project = task['project']
721
            self._cnt['5m'].event((project, 'failed'), +1)
722
            self._cnt['1h'].event((project, 'failed'), +1)
723
            self._cnt['1d'].event((project, 'failed'), +1)
724
            self._cnt['all'].event((project, 'failed'), +1).event((project, 'pending'), -1)
725
            logger.info('task failed %(project)s:%(taskid)s %(url)s' % task)
726
            return task
727
        else:
728
            task['schedule']['retried'] = retried + 1
729
            task['schedule']['exetime'] = time.time() + next_exetime
730
            task['lastcrawltime'] = time.time()
731
            self.update_task(task)
732
            self.put_task(task)
733
734
            project = task['project']
735
            self._cnt['5m'].event((project, 'retry'), +1)
736
            self._cnt['1h'].event((project, 'retry'), +1)
737
            self._cnt['1d'].event((project, 'retry'), +1)
738
            # self._cnt['all'].event((project, 'retry'), +1)
739
            logger.info('task retry %d/%d %%(project)s:%%(taskid)s %%(url)s' % (
740
                retried, retries), task)
741
            return task
742
743
    def on_select_task(self, task):
744
        '''Called when a task is selected to fetch & process'''
745
        # inject informations about project
746
        logger.info('select %(project)s:%(taskid)s %(url)s', task)
747
748
        project_info = self.projects.get(task['project'])
749
        assert project_info, 'no such project'
750
        task['group'] = project_info.get('group')
751
        task['project_md5sum'] = project_info.get('md5sum')
752
        task['project_updatetime'] = project_info.get('updatetime', 0)
753
        project_info['active_tasks'].appendleft((time.time(), task))
754
        self.send_task(task)
755
        return task
756
757
758
from tornado import gen
759
760
761
class OneScheduler(Scheduler):
762
    """
763
    Scheduler Mixin class for one mode
764
765
    overwirted send_task method
766
    call processor.on_task(fetcher.fetch(task)) instead of consuming queue
767
    """
768
769
    def _check_select(self):
770
        """
771
        interactive mode of select tasks
772
        """
773
        if not self.interactive:
774
            return super(OneScheduler, self)._check_select()
775
776
        # waiting for running tasks
777
        if self.running_task > 0:
778
            return
779
780
        is_crawled = []
781
782
        def run(project=None):
783
            return crawl('on_start', project=project)
784
785
        def crawl(url, project=None, **kwargs):
786
            """
787
            Crawl given url, same parameters as BaseHandler.crawl
788
789
            url - url or taskid, parameters will be used if in taskdb
790
            project - can be ignored if only one project exists.
791
            """
792
793
            # looking up the project instance
794
            if project is None:
795
                if len(self.projects) == 1:
796
                    project = list(self.projects.keys())[0]
797
                else:
798
                    raise LookupError('You need specify the project: %r'
799
                                      % list(self.projects.keys()))
800
            project_data = self.processor.project_manager.get(project)
801
            if not project_data:
802
                raise LookupError('no such project: %s' % project)
803
804
            # get task package
805
            instance = project_data['instance']
806
            instance._reset()
807
            task = instance.crawl(url, **kwargs)
808
            if isinstance(task, list):
809
                raise Exception('url list is not allowed in interactive mode')
810
811
            # check task in taskdb
812
            if not kwargs:
813
                dbtask = self.taskdb.get_task(task['project'], task['taskid'],
814
                                              fields=self.request_task_fields)
815
                if not dbtask:
816
                    dbtask = self.taskdb.get_task(task['project'], task['url'],
817
                                                  fields=self.request_task_fields)
818
                if dbtask:
819
                    task = dbtask
820
821
            # select the task
822
            self.on_select_task(task)
823
            is_crawled.append(True)
824
825
            shell.ask_exit()
826
827
        def quit_interactive():
828
            '''Quit interactive mode'''
829
            is_crawled.append(True)
830
            self.interactive = False
831
            shell.ask_exit()
832
833
        def quit_pyspider():
834
            '''Close pyspider'''
835
            is_crawled[:] = []
836
            shell.ask_exit()
837
838
        shell = utils.get_python_console()
839
        shell.interact(
840
            'pyspider shell - Select task\n'
841
            'crawl(url, project=None, **kwargs) - same parameters as BaseHandler.crawl\n'
842
            'quit_interactive() - Quit interactive mode\n'
843
            'quit_pyspider() - Close pyspider'
844
        )
845
        if not is_crawled:
846
            self.ioloop.stop()
847
848
    def __getattr__(self, name):
849
        """patch for crawl(url, callback=self.index_page) API"""
850
        if self.interactive:
851
            return name
852
        raise AttributeError(name)
853
854
    def on_task_status(self, task):
855
        """Ignore not processing error in interactive mode"""
856
        if not self.interactive:
857
            super(OneScheduler, self).on_task_status(task)
858
859
        try:
860
            procesok = task['track']['process']['ok']
861
        except KeyError as e:
862
            logger.error("Bad status pack: %s", e)
863
            return None
864
865
        if procesok:
866
            ret = self.on_task_done(task)
867
        else:
868
            ret = self.on_task_failed(task)
869
        if task['track']['fetch'].get('time'):
870
            self._cnt['5m_time'].event((task['project'], 'fetch_time'),
871
                                       task['track']['fetch']['time'])
872
        if task['track']['process'].get('time'):
873
            self._cnt['5m_time'].event((task['project'], 'process_time'),
874
                                       task['track']['process'].get('time'))
875
        self.projects[task['project']]['active_tasks'].appendleft((time.time(), task))
876
        return ret
877
878
    def init_one(self, ioloop, fetcher, processor,
879
                 result_worker=None, interactive=False):
880
        self.ioloop = ioloop
881
        self.fetcher = fetcher
882
        self.processor = processor
883
        self.result_worker = result_worker
884
        self.interactive = interactive
885
        self.running_task = 0
886
887
    @gen.coroutine
888
    def do_task(self, task):
889
        self.running_task += 1
890
        result = yield gen.Task(self.fetcher.fetch, task)
891
        type, task, response = result.args
892
        self.processor.on_task(task, response)
893
        # do with message
894
        while not self.processor.inqueue.empty():
895
            _task, _response = self.processor.inqueue.get()
896
            self.processor.on_task(_task, _response)
897
        # do with results
898
        while not self.processor.result_queue.empty():
899
            _task, _result = self.processor.result_queue.get()
900
            if self.result_worker:
901
                self.result_worker.on_result(_task, _result)
902
        self.running_task -= 1
903
904
    def send_task(self, task, force=True):
905
        if self.fetcher.http_client.free_size() <= 0:
906
            if force:
907
                self._send_buffer.appendleft(task)
908
            else:
909
                raise self.outqueue.Full
910
        self.ioloop.add_future(self.do_task(task), lambda x: x.result())
911
912
    def run(self):
913
        import tornado.ioloop
914
        tornado.ioloop.PeriodicCallback(self.run_once, 100,
915
                                        io_loop=self.ioloop).start()
916
        self.ioloop.start()
917
918
    def quit(self):
919
        self.ioloop.stop()
920
        logger.info("scheduler exiting...")
921
922
923
import random
924
import hashlib
925
import threading
926
927
928
class ThreadBaseScheduler(Scheduler):
929
    def __init__(self, threads=4, *args, **kwargs):
930
        self.threads = threads
931
        self.local = threading.local()
932
933
        super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
934
935
        self._taskdb = self.taskdb
936
        self._projectdb = self.projectdb
937
        self._resultdb = self.resultdb
938
939
        self.thread_objs = []
940
        self.thread_queues = []
941
        self._start_threads()
942
        assert len(self.thread_queues) > 0
943
944
    @property
945
    def taskdb(self):
946
        return self.local.taskdb
947
948
    @taskdb.setter
949
    def taskdb(self, taskdb):
950
        self.local.taskdb = taskdb
951
952
    @property
953
    def projectdb(self):
954
        return self.local.projectdb
955
956
    @projectdb.setter
957
    def projectdb(self, projectdb):
958
        self.local.projectdb = projectdb
959
960
    @property
961
    def resultdb(self):
962
        return self.local.resultdb
963
964
    @resultdb.setter
965
    def resultdb(self, resultdb):
966
        self.local.resultdb = resultdb
967
968
    def _start_threads(self):
969
        for i in range(self.threads):
970
            queue = Queue.Queue()
971
            thread = threading.Thread(target=self._thread_worker, args=(queue, ))
972
            thread.daemon = True
973
            thread.start()
974
            self.thread_objs.append(thread)
975
            self.thread_queues.append(queue)
976
977
    def _thread_worker(self, queue):
978
        self.taskdb = self._taskdb.copy()
979
        self.projectdb = self._projectdb.copy()
980
        self.resultdb = self._resultdb.copy()
981
982
        while True:
983
            method, args, kwargs = queue.get()
984
            try:
985
                method(*args, **kwargs)
986
            except Exception as e:
987
                logger.exception(e)
988
989
    def _run_in_thread(self, method, *args, **kwargs):
990
        i = kwargs.pop('_i', None)
991
        block = kwargs.pop('_block', False)
992
993
        if i is None:
994
            while True:
995
                for queue in self.thread_queues:
996
                    if queue.empty():
997
                        break
998
                else:
999
                    if block:
1000
                        time.sleep(0.1)
1001
                        continue
1002
                    else:
1003
                        queue = self.thread_queues[random.randint(0, len(self.thread_queues)-1)]
1004
                break
1005
        else:
1006
            queue = self.thread_queues[i % len(self.thread_queues)]
1007
1008
        queue.put((method, args, kwargs))
1009
1010
        if block:
1011
            self._wait_thread()
1012
1013
    def _wait_thread(self):
1014
        while True:
1015
            if all(queue.empty() for queue in self.thread_queues):
1016
                break
1017
            time.sleep(0.1)
1018
1019
    def _update_project(self, project):
1020
        self._run_in_thread(Scheduler._update_project, self, project)
1021
1022
    def on_task_status(self, task):
1023
        i = ord(hashlib.md5(task['taskid']).digest()[-1])
1024
        self._run_in_thread(Scheduler.on_task_status, self, task, _i=i)
1025
1026
    def on_request(self, task):
1027
        i = ord(hashlib.md5(task['taskid']).digest()[-1])
1028
        self._run_in_thread(Scheduler.on_request, self, task, _i=i)
1029
1030
    def _load_put_task(self, project, taskid):
1031
        i = ord(hashlib.md5(taskid).digest()[-1])
1032
        self._run_in_thread(Scheduler._load_put_task, self, project, taskid, _i=i)
1033
1034
    def run_once(self):
1035
        super(ThreadBaseScheduler, self).run_once()
1036
        self._wait_thread()
1037