File.getTasksCounters()   F
last analyzed

Complexity

Conditions 9

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 26
rs 3
1
import logging
2
import os
3
4
from PyQt5 import QtCore
5
6
from qtodotxt2.lib.filters import DueTodayFilter, DueTomorrowFilter, DueThisWeekFilter, DueThisMonthFilter, DueOverdueFilter
7
from qtodotxt2.lib.tasklib import Task
8
9
logger = logging.getLogger(__name__)
10
11
12
class File(QtCore.QObject):
13
14
    fileExternallyModified = QtCore.pyqtSignal()
15
    fileModified = QtCore.pyqtSignal(bool)
16
17
    def __init__(self):
18
        QtCore.QObject.__init__(self)
19
        self.newline = '\n'
20
        self.tasks = []
21
        self.filename = ''
22
        self._fileObserver = FileObserver()
23
        self._fileObserver.fileChangetSig.connect(self.fileExternallyModified)
24
        self.modified = False
25
26
    def __str__(self):
27
        return "File(filename:{}, tasks:{})".format(self.filename, self.tasks)
28
29
    __repr__ = __str__
30
31
    def load(self, filename):
32
        self._fileObserver.clear()
33
        with open(filename, 'rt', encoding='utf-8') as fd:
34
            lines = fd.readlines()
35
        self.filename = filename
36
        self._createTasksFromLines(lines)
37
        self._fileObserver.addPath(self.filename)
38
39
    def _createTasksFromLines(self, lines):
40
        self.tasks = []
41
        for line in lines:
42
            task_text = line.strip()
43
            if task_text:
44
                task = Task(task_text)
45
                self.tasks.append(task)
46
                task.modified.connect(self._taskModified)
47
48
    def _taskModified(self, task):
49
        self.setModified(True)
50
        #if task not in self.tasks:
51
            #self.tasks.append(task)
52
        if not task.text:
53
            self.deleteTask(task)
54
55
    def setModified(self, val):
56
        self.modified = val
57
        self.fileModified.emit(val)
58
59
    def deleteTask(self, task):
60
        self.tasks.remove(task)
61
        self.setModified(True)
62
63
    def addTask(self, task):
64
        self.tasks.append(task)
65
        task.modified.connect(self._taskModified)
66
        self.setModified(True)
67
68
    def connectTask(self, task):
69
        task.modified.connect(self._taskModified)
70
71
    def save(self, filename=''):
72
#        logger.debug('File.save called with filename="%s"', filename)
73
        self._fileObserver.clear()
74
        if not filename and not self.filename:
75
            self.filename = self._createNewFilename()
76
        elif filename:
77
            self.filename = filename
78
        self.tasks = sorted(self.tasks)  # we sort for users using simple text editors
79
        self._saveTasks()
80
        self.modified = False
81
        self.fileModified.emit(False)
82
        self._fileObserver.addPath(self.filename)
83
84
    @staticmethod
85
    def _createNewFilename():
86
        newFileName = os.path.expanduser('~/todo.txt')
87
        if not os.path.isfile(newFileName):
88
            return newFileName
89
        for counter in range(0, 100):
90
            newFileName = os.path.expanduser('~/todo.{}.txt.'.format(counter))
91
            if not os.path.isfile(newFileName):
92
                return newFileName
93
        return os.path.expanduser('~/todo.0.txt')
94
95
    def _saveTasks(self):
96
        with open(self.filename, 'wt', encoding='utf-8') as fd:
97
            fd.writelines([(task.text + self.newline) for task in self.tasks])
98
#        logger.debug('%s was saved to disk.', self.filename)
99
100
    def saveDoneTask(self, task):
101
        doneFilename = os.path.join(os.path.dirname(self.filename), 'done.txt')
102
        with open(doneFilename, 'at', encoding='utf-8') as fd:
103
            fd.write(task.text + self.newline)
104
        logger.debug('"%s" was appended to "%s"', task.text, doneFilename)
105
106
    def getAllContexts(self):
107
        return self._getAllX("contexts")
108
109
    def getAllProjects(self):
110
        return self._getAllX("projects")
111
112
    def _getAllX(self, name):
113
        res = {}
114
        for task in self.tasks:
115
            for element in getattr(task, name):
116
                if element not in res:
117
                    res[element] = [0, 0]
118
                idx = 1 if task.is_complete else 0
119
                res[element][idx] += 1
120
        return res
121
122
    def getAllPriorities(self):
123
        return self._getAllX("priority")
124
125
    def getAllDueRanges(self):
126
        dueRanges = dict()
127
        filters = [DueTodayFilter(), DueTomorrowFilter(), DueThisWeekFilter(), DueThisMonthFilter(), DueOverdueFilter()]
128
        for task in self.tasks:
129
            idx = 1 if task.is_complete else 0
130
            for flt in filters:
131
                if flt.isMatch(task):
132
                    if not (flt in dueRanges):
133
                        dueRanges[flt] = [0, 0]
134
                    dueRanges[flt][idx] += 1
135
        return dueRanges
136
137
    def getTasksCounters(self):
138
        counters = dict({
139
            'All': [0, 0],
140
            'Uncategorized': [0, 0],
141
            'Contexts': [0, 0],
142
            'Projects': [0, 0],
143
            'Complete': [0, 0],
144
            'Priority': [0, 0],
145
            'Due': [0, 0]
146
        })
147
        for task in self.tasks:
148
            nbProjects = len(task.projects)
149
            nbContexts = len(task.contexts)
150
            idx = 1 if task.is_complete else 0
151
            counters['All'][idx] += 1
152
            if nbProjects > 0:
153
                counters['Projects'][idx] += 1
154
            if nbContexts > 0:
155
                counters['Contexts'][idx] += 1
156
            if nbContexts == 0 and nbProjects == 0:
157
                counters['Uncategorized'][idx] += 1
158
            if task.due:
159
                counters['Due'][idx] += 1
160
            if task.priority != "":
161
                counters['Priority'][idx] += 1
162
        return counters
163
164
165
class FileObserver(QtCore.QFileSystemWatcher):
166
167
    fileChangetSig = QtCore.pyqtSignal(str)
168
    dirChangetSig = QtCore.pyqtSignal(str)
169
170
    def __init__(self):
171
        logger.debug('Setting up FileObserver instance.')
172
        super().__init__()
173
        self.fileChanged.connect(self.fileChangedHandler)
174
        self.directoryChanged.connect(self.dirChangedHandler)
175
176
    def fileChangedHandler(self, path):
177
        logger.debug('Detected external file change for file %s', path)
178
        self.removePath(path)
179
        self.fileChangetSig.emit(path)
180
181
    def dirChangedHandler(self, path):
182
        logger.debug('Detected directory change for file %s', path)
183
        self.dirChangetSig.emit(path)
184
185
    def clear(self):
186
        if self.files():
187
#            logger.debug('Clearing watchlist.')
188
            self.removePaths(self.files())
189