Passed
Push — master ( aeb165...d9ae97 )
by Dean
03:04
created

Scheduler.process_job()   B

Complexity

Conditions 4

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20
Metric Value
dl 0
loc 23
ccs 0
cts 15
cp 0
rs 8.7972
cc 4
crap 20
1
from plugin.models import SchedulerJob
2
from plugin.modules.core.base import Module
3
from plugin.modules.scheduler.handlers import BackupMaintenanceIntervalHandler, SyncIntervalHandler
4
5
from datetime import datetime
6
from threading import Thread
7
import logging
8
import time
9
10
log = logging.getLogger(__name__)
11
12
HANDLERS = [
13
    BackupMaintenanceIntervalHandler,
14
    SyncIntervalHandler
15
]
16
17
18
class Scheduler(Module):
19
    __key__ = 'scheduler'
20
21
    handlers = dict([
22
        (h.key, h) for h in HANDLERS
23
        if h.key is not None
24
    ])
25
26
    def __init__(self):
27
        self._running = False
28
29
        self._thread = Thread(target=self.run, name='Scheduler')
30
        self._thread.daemon = True
31
32
    def start(self):
33
        self._running = True
34
        self._thread.start()
35
36
        log.debug('Started')
37
38
    def run(self):
39
        while self._running:
40
            # Process batch
41
            self.process()
42
43
            # Wait 60s before re-checking
44
            time.sleep(60)
45
46
    def process(self):
47
        # Retrieve due jobs
48
        now = datetime.utcnow()
49
50
        jobs = list(SchedulerJob.select().where(
51
            SchedulerJob.due_at <= now
52
        ))
53
54
        if len(jobs) < 1:
55
            return
56
57
        log.debug('Processing %s job(s)', len(jobs))
58
59
        for job in jobs:
60
            # Process job
61
            update = self.process_job(job)
62
63
            # Update job status
64
            self.finish(job, update)
65
66
        log.debug('Complete')
67
68
    def process_job(self, job):
69
        if job.account.deleted:
70
            # Ignore scheduled jobs for deleted accounts
71
            return True
72
73
        log.info('Running job: %r', job)
74
75
        # Retrieve handler for job
76
        handler = self.handlers.get(job.task_key)
77
78
        if handler is None:
79
            log.info('Deleting job with unknown task key: %r', job.task_key)
80
            job.delete_instance()
81
            return False
82
83
        # Run handler
84
        try:
85
            h = handler()
86
87
            return h.run(job)
88
        except Exception, ex:
89
            log.error('Exception raised in job handler: %s', ex, exc_info=True)
90
            return True
91
92
    @staticmethod
93
    def finish(job, update=True):
94
        if not update:
95
            return
96
97
        # Update status
98
        job.ran_at = datetime.utcnow()
99
        job.due_at = job.next_at()
100
101
        # Save changes
102
        job.save()
103