doorstop.core.base   F
last analyzed

Complexity

Total Complexity 66

Size/Duplication

Total Lines 370
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 66
eloc 190
dl 0
loc 370
ccs 153
cts 153
cp 1
rs 3.12
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseFileObject.load() 0 9 3
A BaseFileObject._write() 0 10 2
A BaseFileObject.get() 0 17 2
A BaseFileObject.extended() 0 9 3
A BaseFileObject.save() 0 7 1
A BaseFileObject._create() 0 15 2
A BaseValidatable.issues() 0 4 1
A BaseValidatable.get_issues() 0 3 1
A BaseFileObject._read() 0 12 2
A BaseFileObject.delete() 0 9 2
B BaseValidatable.validate() 0 26 6
A BaseFileObject.__ne__() 0 2 1
A BaseFileObject.set() 0 16 2
A BaseFileObject._dump() 0 10 1
A BaseFileObject.__init__() 0 6 1
A BaseFileObject.__hash__() 0 2 1
A BaseFileObject.__eq__() 0 2 1
A BaseFileObject.relpath() 0 5 1
A BaseFileObject._load() 0 11 1

8 Functions

Rating   Name   Duplication   Size   Complexity  
A add_document() 0 15 5
A edit_item() 0 11 3
B delete_item() 0 18 6
A auto_save() 0 11 2
A auto_load() 0 9 1
A edit_document() 0 11 3
B add_item() 0 17 6
B delete_document() 0 20 6

How to fix   Complexity   

Complexity

Complex classes like doorstop.core.base often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3 1
"""Base classes and decorators for the doorstop.core package."""
4 1
5 1
import abc
6
import functools
7 1
import os
8
from typing import Dict
9 1
10 1
import yaml
11 1
12
from doorstop import common, settings
13 1
from doorstop.common import DoorstopError, DoorstopInfo, DoorstopWarning
14
15
log = common.logger(__name__)
16 1
17
18 1
def add_item(func):
19
    """Add and cache the returned item."""
20
21 1
    @functools.wraps(func)
22 1
    def wrapped(self, *args, **kwargs):
23 1
        item = func(self, *args, **kwargs) or self
24
        if settings.ADDREMOVE_FILES and item.tree:
25 1
            item.tree.vcs.add(item.path)
26 1
        # pylint: disable=W0212
27 1
        if item not in item.document._items:
28 1
            item.document._items.append(item)
29 1
        if settings.CACHE_ITEMS and item.tree:
30 1
            item.tree._item_cache[item.uid] = item
31 1
            log.trace("cached item: {}".format(item))  # type: ignore
32
        return item
33
34 1
    return wrapped
35
36 1
37
def edit_item(func):
38
    """Mark the returned item as modified."""
39 1
40 1
    @functools.wraps(func)
41 1
    def wrapped(self, *args, **kwargs):
42 1
        item = func(self, *args, **kwargs) or self
43 1
        if settings.ADDREMOVE_FILES and item.tree:
44
            item.tree.vcs.edit(item.path)
45
        return item
46 1
47
    return wrapped
48 1
49
50
def delete_item(func):
51 1
    """Remove and expunge the returned item."""
52 1
53 1
    @functools.wraps(func)
54
    def wrapped(self, *args, **kwargs):
55 1
        item = func(self, *args, **kwargs) or self
56 1
        if settings.ADDREMOVE_FILES and item.tree:
57 1
            item.tree.vcs.delete(item.path)
58 1
        # pylint: disable=W0212
59 1
        if item in item.document._items:
60 1
            item.document._items.remove(item)
61 1
        if settings.CACHE_ITEMS and item.tree:
62 1
            item.tree._item_cache[item.uid] = None
63
            log.trace("expunged item: {}".format(item))  # type: ignore
64
        BaseFileObject.delete(item, item.path)
65 1
        return item
66
67 1
    return wrapped
68
69
70 1
def add_document(func):
71 1
    """Add and cache the returned document."""
72 1
73
    @functools.wraps(func)
74 1
    def wrapped(self, *args, **kwargs):
75 1
        document = func(self, *args, **kwargs) or self
76 1
        if settings.ADDREMOVE_FILES and document.tree:
77 1
            document.tree.vcs.add(document.config)
78 1
        # pylint: disable=W0212
79
        if settings.CACHE_DOCUMENTS and document.tree:
80
            document.tree._document_cache[document.prefix] = document
81 1
            log.trace("cached document: {}".format(document))  # type: ignore
82
        return document
83 1
84
    return wrapped
85
86 1
87 1
def edit_document(func):
88 1
    """Mark the returned document as modified."""
89 1
90 1
    @functools.wraps(func)
91
    def wrapped(self, *args, **kwargs):
92
        document = func(self, *args, **kwargs) or self
93 1
        if settings.ADDREMOVE_FILES and document.tree:
94
            document.tree.vcs.edit(document.config)
95 1
        return document
96
97
    return wrapped
98 1
99 1
100 1
def delete_document(func):
101
    """Remove and expunge the returned document."""
102 1
103 1
    @functools.wraps(func)
104 1
    def wrapped(self, *args, **kwargs):
105 1
        document = func(self, *args, **kwargs) or self
106 1
        if settings.ADDREMOVE_FILES and document.tree:
107 1
            document.tree.vcs.delete(document.config)
108
        # pylint: disable=W0212
109 1
        if settings.CACHE_DOCUMENTS and document.tree:
110 1
            document.tree._document_cache[document.prefix] = None
111 1
            log.trace("expunged document: {}".format(document))  # type: ignore
112
        try:
113
            os.rmdir(document.path)
114 1
        except OSError:
115
            # Directory wasn't empty
116
            pass
117 1
        return document
118
119
    return wrapped
120
121
122
class BaseValidatable(metaclass=abc.ABCMeta):
123
    """Abstract Base Class for objects that can be validated."""
124
125
    def validate(self, skip=None, document_hook=None, item_hook=None):
126
        """Check the object for validity.
127
128 1
        :param skip: list of document prefixes to skip
129
        :param document_hook: function to call for custom document
130 1
            validation
131
        :param item_hook: function to call for custom item validation
132 1
133 1
        :return: indication that the object is valid
134 1
135 1
        """
136
        valid = True
137 1
        # Display all issues
138 1
        for issue in self.get_issues(
139 1
            skip=skip, document_hook=document_hook, item_hook=item_hook
140
        ):
141 1
            if isinstance(issue, DoorstopInfo) and not settings.WARN_ALL:
142
                log.info(issue)
143 1
            elif isinstance(issue, DoorstopWarning) and not settings.ERROR_ALL:
144 1
                log.warning(issue)
145
            else:
146
                assert isinstance(issue, DoorstopError)
147
                log.error(issue)
148
                valid = False
149
        # Return the result
150
        return valid
151
152
    @abc.abstractmethod
153
    def get_issues(self, skip=None, document_hook=None, item_hook=None):
154
        """Yield all the objects's issues.
155
156
        :param skip: list of document prefixes to skip
157
        :param document_hook: function to call for custom document
158 1
            validation
159
        :param item_hook: function to call for custom item validation
160
161 1
        :return: generator of :class:`~doorstop.common.DoorstopError`,
162
                              :class:`~doorstop.common.DoorstopWarning`,
163
                              :class:`~doorstop.common.DoorstopInfo`
164 1
165
        """
166 1
167
    @property
168
    def issues(self):
169 1
        """Get a list of the item's issues."""
170 1
        return list(self.get_issues())
171 1
172
173
def auto_load(func):
174 1
    """Call self.load() before execution."""
175
176 1
    @functools.wraps(func)
177
    def wrapped(self, *args, **kwargs):
178
        self.load()
179 1
        return func(self, *args, **kwargs)
180 1
181 1
    return wrapped
182 1
183 1
184
def auto_save(func):
185
    """Call self.save() after execution."""
186 1
187
    @functools.wraps(func)
188
    def wrapped(self, *args, **kwargs):
189
        result = func(self, *args, **kwargs)
190
        if self.auto:
191
            self.save()
192
        return result
193
194 1
    return wrapped
195
196 1
197 1
class BaseFileObject(metaclass=abc.ABCMeta):
198 1
    """Abstract Base Class for objects whose attributes save to a file.
199 1
200 1
    For properties that are saved to a file, decorate their getters
201 1
    with :func:`auto_load` and their setters with :func:`auto_save`.
202
203 1
    """
204 1
205
    auto = True  # set to False to delay automatic save until explicit save
206 1
207 1
    def __init__(self):
208
        self.path = None
209 1
        self.root = None
210 1
        self._data: Dict[str, str] = {}
211
        self._exists = True
212 1
        self._loaded = False
213
214
    def __hash__(self):
215
        return hash(self.path)
216
217
    def __eq__(self, other):
218
        return isinstance(other, self.__class__) and self.path == other.path
219
220
    def __ne__(self, other):
221
        return not self == other
222
223
    @staticmethod
224
    def _create(path, name):
225
        """Create a new file for the object.
226
227
        :param path: path to new file
228 1
        :param name: humanized name for this file
229
230
        :raises: :class:`~doorstop.common.DoorstopError` if the file
231
            already exists
232
233
        """
234
        if os.path.exists(path):
235
            raise DoorstopError("{} already exists: {}".format(name, path))
236
        common.create_dirname(path)
237
        common.touch(path)
238
239
    @abc.abstractmethod
240
    def load(self, reload=False):
241
        """Load the object's properties from its file."""
242
        # 1. Start implementations of this method with:
243
        if self._loaded and not reload:
244
            return
245
        # 2. Call self._read() and update properties here
246
        # 3. End implementations of this method with:
247
        self._loaded = True
248
249
    def _read(self, path):
250
        """Read text from the object's file.
251 1
252
        :param path: path to a text file
253
254
        :return: contexts of text file
255
256
        """
257
        if not self._exists:
258
            msg = "cannot read from deleted: {}".format(self.path)
259
            raise DoorstopError(msg)
260
        return common.read_text(path)
261 1
262
    @staticmethod
263 1
    def _load(text, path, **kwargs):
264
        """Load YAML data from text.
265
266
        :param text: text read from a file
267
        :param path: path to the file (for displaying errors)
268
269
        :return: dictionary of YAML data
270
271
        """
272
        return common.load_yaml(text, path, **kwargs)
273
274
    @abc.abstractmethod
275
    def save(self):
276
        """Format and save the object's properties to its file."""
277
        # 1. Call self._write() with the current properties here
278
        # 2. End implementations of this method with:
279
        self._loaded = False
280
        self.auto = True
281
282 1
    def _write(self, text, path):
283
        """Write text to the object's file.
284
285
        :param text: text to write to a file
286
        :param path: path to the file
287
288
        """
289
        if not self._exists:
290
            raise DoorstopError("cannot save to deleted: {}".format(self))
291 1
        common.write_text(text, path)
292
293
    @staticmethod
294
    def _dump(data):
295 1
        """Dump YAML data to text.
296
297
        :param data: dictionary of YAML data
298 1
299 1
        :return: text to write to a file
300
301
        """
302
        return yaml.dump(data, default_flow_style=False, allow_unicode=True)
303 1
304 1
    # properties #############################################################
305
306
    @property
307 1
    def relpath(self):
308 1
        """Get the item's relative path string."""
309 1
        relpath = os.path.relpath(self.path, self.root)
310 1
        return "@{}{}".format(os.sep, relpath)
311 1
312
    # extended attributes ####################################################
313 1
314 1
    @property  # type: ignore
315
    @auto_load
316
    def extended(self):
317
        """Get a list of all extended attribute names."""
318
        names = []
319
        for name in self._data:
320
            if not hasattr(self, name):
321
                names.append(name)
322
        return sorted(names)
323 1
324 1
    @auto_load
325 1
    def get(self, name, default=None):
326 1
        """Get an extended attribute.
327 1
328
        :param name: name of extended attribute
329 1
        :param default: value to return for missing attributes
330
331 1
        :return: value of extended attribute
332 1
333
        """
334
        if hasattr(self, name):
335
            cname = self.__class__.__name__
336
            msg = "'{n}' can be accessed from {c}.{n}".format(n=name, c=cname)
337
            log.trace(msg)  # type: ignore
338
            return getattr(self, name)
339
        else:
340 1
            return self._data.get(name, default)
341 1
342 1
    @auto_load
343 1
    @auto_save
344 1
    def set(self, name, value):
345
        """Set an extended attribute.
346 1
347
        :param name: name of extended attribute
348
        :param value: value to set
349
350 1
        """
351
        if hasattr(self, name):
352 1
            cname = self.__class__.__name__
353 1
            msg = "'{n}' can be set from {c}.{n}".format(n=name, c=cname)
354 1
            log.trace(msg)  # type: ignore
355 1
            setattr(self, name, value)
356 1
        else:
357
            self._data[name] = value
358 1
359
    # actions ################################################################
360
361
    def delete(self, path):
362
        """Delete the object's file from the file system."""
363
        if self._exists:
364
            log.info("deleting {}...".format(path))
365
            common.delete(path)
366
            self._loaded = False  # force the object to reload
367
            self._exists = False  # but, prevent future access
368
        else:
369
            log.warning("already deleted: {}".format(self))
370