chaoswg.scheduler   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 37
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 37
rs 10
c 0
b 0
f 0
wmc 7

3 Methods

Rating   Name   Duplication   Size   Complexity  
A TaskScheduler.run() 0 4 2
A TaskScheduler.__init__() 0 6 1
A TaskScheduler.schedule_tasks() 0 13 4
1
import threading
2
import time
3
from datetime import datetime
4
5
import schedule
6
7
from chaoswg import Task
8
9
10
class TaskScheduler(threading.Thread):
0 ignored issues
show
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
11
12
    def __init__(self, interval=60):
13
        # init a daemonized thread
14
        threading.Thread.__init__(self, daemon=True)
15
        self.interval = interval
16
        # run an update every hour
17
        schedule.every().hour.do(self.schedule_tasks)
18
19
    @staticmethod
20
    def schedule_tasks():
21
        now = datetime.utcnow()
22
        schedule_tasks = Task.get_schedule_tasks()
23
24
        for task in schedule_tasks:
25
            if task['last_done'] is not None:
26
                delta = now - task['last_done']
27
                if delta.days >= task['schedule_days']:
28
                    Task.set_todo(task['id'])
29
            else:
30
                # Task was not done yet
31
                Task.set_todo(task['id'])
32
33
    def run(self):
34
        while True:
35
            schedule.run_pending()
36
            time.sleep(self.interval)
37