doorstop.core.base   F
last analyzed

Complexity

Total Complexity 66

Size/Duplication

Total Lines 371
Duplicated Lines 7.01 %

Importance

Changes 0
Metric Value
wmc 66
eloc 191
dl 26
loc 371
rs 3.12
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseFileObject.load() 0 9 3
A BaseFileObject._write() 0 10 2
A BaseFileObject.save() 0 7 1
A BaseFileObject._create() 0 15 2
A BaseValidatable.issues() 0 4 1
A BaseValidatable.get_issues() 0 3 1
A BaseFileObject._read() 0 12 2
A BaseFileObject.__ne__() 0 2 1
A BaseFileObject._dump() 0 10 1
A BaseFileObject.__init__() 0 6 1
A BaseFileObject.__hash__() 0 2 1
A BaseFileObject.__eq__() 0 2 1
A BaseFileObject._load() 0 11 1
B BaseValidatable.validate() 26 26 6
A BaseFileObject.get() 0 17 2
A BaseFileObject.extended() 0 9 3
A BaseFileObject.delete() 0 9 2
A BaseFileObject.set() 0 16 2
A BaseFileObject.relpath() 0 6 1

8 Functions

Rating   Name   Duplication   Size   Complexity  
A add_document() 0 15 5
A edit_item() 0 11 3
B delete_item() 0 18 6
A auto_save() 0 11 2
A auto_load() 0 9 1
A edit_document() 0 11 3
B add_item() 0 17 6
B delete_document() 0 20 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like doorstop.core.base often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-License-Identifier: LGPL-3.0-only
2
3
"""Base classes and decorators for the doorstop.core package."""
4
5
import abc
6
import functools
7
import os
8
from typing import Dict
9
10
import yaml
11
12
from doorstop import common, settings
13
from doorstop.common import DoorstopError, DoorstopInfo, DoorstopWarning
14
15
log = common.logger(__name__)
16
17
18
def add_item(func):
19
    """Add and cache the returned item."""
20
21
    @functools.wraps(func)
22
    def wrapped(self, *args, **kwargs):
23
        item = func(self, *args, **kwargs) or self
24
        if settings.ADDREMOVE_FILES and item.tree:
25
            item.tree.vcs.add(item.path)
26
        # pylint: disable=W0212
27
        if item not in item.document._items:
28
            item.document._items.append(item)
29
        if settings.CACHE_ITEMS and item.tree:
30
            item.tree._item_cache[item.uid] = item
31
            log.trace("cached item: {}".format(item))  # type: ignore
32
        return item
33
34
    return wrapped
35
36
37
def edit_item(func):
38
    """Mark the returned item as modified."""
39
40
    @functools.wraps(func)
41
    def wrapped(self, *args, **kwargs):
42
        item = func(self, *args, **kwargs) or self
43
        if settings.ADDREMOVE_FILES and item.tree:
44
            item.tree.vcs.edit(item.path)
45
        return item
46
47
    return wrapped
48
49
50
def delete_item(func):
51
    """Remove and expunge the returned item."""
52
53
    @functools.wraps(func)
54
    def wrapped(self, *args, **kwargs):
55
        item = func(self, *args, **kwargs) or self
56
        if settings.ADDREMOVE_FILES and item.tree:
57
            item.tree.vcs.delete(item.path)
58
        # pylint: disable=W0212
59
        if item in item.document._items:
60
            item.document._items.remove(item)
61
        if settings.CACHE_ITEMS and item.tree:
62
            item.tree._item_cache[item.uid] = None
63
            log.trace("expunged item: {}".format(item))  # type: ignore
64
        BaseFileObject.delete(item, item.path)
65
        return item
66
67
    return wrapped
68
69
70
def add_document(func):
71
    """Add and cache the returned document."""
72
73
    @functools.wraps(func)
74
    def wrapped(self, *args, **kwargs):
75
        document = func(self, *args, **kwargs) or self
76
        if settings.ADDREMOVE_FILES and document.tree:
77
            document.tree.vcs.add(document.config)
78
        # pylint: disable=W0212
79
        if settings.CACHE_DOCUMENTS and document.tree:
80
            document.tree._document_cache[document.prefix] = document
81
            log.trace("cached document: {}".format(document))  # type: ignore
82
        return document
83
84
    return wrapped
85
86
87
def edit_document(func):
88
    """Mark the returned document as modified."""
89
90
    @functools.wraps(func)
91
    def wrapped(self, *args, **kwargs):
92
        document = func(self, *args, **kwargs) or self
93
        if settings.ADDREMOVE_FILES and document.tree:
94
            document.tree.vcs.edit(document.config)
95
        return document
96
97
    return wrapped
98
99
100
def delete_document(func):
101
    """Remove and expunge the returned document."""
102
103
    @functools.wraps(func)
104
    def wrapped(self, *args, **kwargs):
105
        document = func(self, *args, **kwargs) or self
106
        if settings.ADDREMOVE_FILES and document.tree:
107
            document.tree.vcs.delete(document.config)
108
        # pylint: disable=W0212
109
        if settings.CACHE_DOCUMENTS and document.tree:
110
            document.tree._document_cache[document.prefix] = None
111
            log.trace("expunged document: {}".format(document))  # type: ignore
112
        try:
113
            os.rmdir(document.path)
114
        except OSError:
115
            # Directory wasn't empty
116
            pass
117
        return document
118
119
    return wrapped
120
121
122
class BaseValidatable(metaclass=abc.ABCMeta):
123
    """Abstract Base Class for objects that can be validated."""
124
125 View Code Duplication
    def validate(self, skip=None, document_hook=None, item_hook=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
126
        """Check the object for validity.
127
128
        :param skip: list of document prefixes to skip
129
        :param document_hook: function to call for custom document
130
            validation
131
        :param item_hook: function to call for custom item validation
132
133
        :return: indication that the object is valid
134
135
        """
136
        valid = True
137
        # Display all issues
138
        for issue in self.get_issues(
139
            skip=skip, document_hook=document_hook, item_hook=item_hook
140
        ):
141
            if isinstance(issue, DoorstopInfo) and not settings.WARN_ALL:
142
                log.info(issue)
143
            elif isinstance(issue, DoorstopWarning) and not settings.ERROR_ALL:
144
                log.warning(issue)
145
            else:
146
                assert isinstance(issue, DoorstopError)
147
                log.error(issue)
148
                valid = False
149
        # Return the result
150
        return valid
151
152
    @abc.abstractmethod
153
    def get_issues(self, skip=None, document_hook=None, item_hook=None):
154
        """Yield all the objects's issues.
155
156
        :param skip: list of document prefixes to skip
157
        :param document_hook: function to call for custom document
158
            validation
159
        :param item_hook: function to call for custom item validation
160
161
        :return: generator of :class:`~doorstop.common.DoorstopError`,
162
                              :class:`~doorstop.common.DoorstopWarning`,
163
                              :class:`~doorstop.common.DoorstopInfo`
164
165
        """
166
167
    @property
168
    def issues(self):
169
        """Get a list of the item's issues."""
170
        return list(self.get_issues())
171
172
173
def auto_load(func):
174
    """Call self.load() before execution."""
175
176
    @functools.wraps(func)
177
    def wrapped(self, *args, **kwargs):
178
        self.load()
179
        return func(self, *args, **kwargs)
180
181
    return wrapped
182
183
184
def auto_save(func):
185
    """Call self.save() after execution."""
186
187
    @functools.wraps(func)
188
    def wrapped(self, *args, **kwargs):
189
        result = func(self, *args, **kwargs)
190
        if self.auto:
191
            self.save()
192
        return result
193
194
    return wrapped
195
196
197
class BaseFileObject(metaclass=abc.ABCMeta):
198
    """Abstract Base Class for objects whose attributes save to a file.
199
200
    For properties that are saved to a file, decorate their getters
201
    with :func:`auto_load` and their setters with :func:`auto_save`.
202
203
    """
204
205
    auto = True  # set to False to delay automatic save until explicit save
206
207
    def __init__(self):
208
        self.path = None
209
        self.root = None
210
        self._data: Dict[str, str] = {}
211
        self._exists = True
212
        self._loaded = False
213
214
    def __hash__(self):
215
        return hash(self.path)
216
217
    def __eq__(self, other):
218
        return isinstance(other, self.__class__) and self.path == other.path
219
220
    def __ne__(self, other):
221
        return not self == other
222
223
    @staticmethod
224
    def _create(path, name):
225
        """Create a new file for the object.
226
227
        :param path: path to new file
228
        :param name: humanized name for this file
229
230
        :raises: :class:`~doorstop.common.DoorstopError` if the file
231
            already exists
232
233
        """
234
        if os.path.exists(path):
235
            raise DoorstopError("{} already exists: {}".format(name, path))
236
        common.create_dirname(path)
237
        common.touch(path)
238
239
    @abc.abstractmethod
240
    def load(self, reload=False):
241
        """Load the object's properties from its file."""
242
        # 1. Start implementations of this method with:
243
        if self._loaded and not reload:
244
            return
245
        # 2. Call self._read() and update properties here
246
        # 3. End implementations of this method with:
247
        self._loaded = True
248
249
    def _read(self, path):
250
        """Read text from the object's file.
251
252
        :param path: path to a text file
253
254
        :return: contexts of text file
255
256
        """
257
        if not self._exists:
258
            msg = "cannot read from deleted: {}".format(self.path)
259
            raise DoorstopError(msg)
260
        return common.read_text(path)
261
262
    @staticmethod
263
    def _load(text, path, **kwargs):
264
        """Load YAML data from text.
265
266
        :param text: text read from a file
267
        :param path: path to the file (for displaying errors)
268
269
        :return: dictionary of YAML data
270
271
        """
272
        return common.load_yaml(text, path, **kwargs)
273
274
    @abc.abstractmethod
275
    def save(self):
276
        """Format and save the object's properties to its file."""
277
        # 1. Call self._write() with the current properties here
278
        # 2. End implementations of this method with:
279
        self._loaded = False
280
        self.auto = True
281
282
    def _write(self, text, path):
283
        """Write text to the object's file.
284
285
        :param text: text to write to a file
286
        :param path: path to the file
287
288
        """
289
        if not self._exists:
290
            raise DoorstopError("cannot save to deleted: {}".format(self))
291
        common.write_text(text, path, end=settings.WRITE_LINESEPERATOR)
292
293
    @staticmethod
294
    def _dump(data):
295
        """Dump YAML data to text.
296
297
        :param data: dictionary of YAML data
298
299
        :return: text to write to a file
300
301
        """
302
        return yaml.dump(data, default_flow_style=False, allow_unicode=True)
303
304
    # properties #############################################################
305
306
    @property
307
    def relpath(self):
308
        """Get the item's relative path string."""
309
        assert self.path
310
        relpath = os.path.relpath(self.path, self.root)
311
        return "@{}{}".format(os.sep, relpath)
312
313
    # extended attributes ####################################################
314
315
    @property  # type: ignore
316
    @auto_load
317
    def extended(self):
318
        """Get a list of all extended attribute names."""
319
        names = []
320
        for name in self._data:
321
            if not hasattr(self, name):
322
                names.append(name)
323
        return sorted(names)
324
325
    @auto_load
326
    def get(self, name, default=None):
327
        """Get an extended attribute.
328
329
        :param name: name of extended attribute
330
        :param default: value to return for missing attributes
331
332
        :return: value of extended attribute
333
334
        """
335
        if hasattr(self, name):
336
            cname = self.__class__.__name__
337
            msg = "'{n}' can be accessed from {c}.{n}".format(n=name, c=cname)
338
            log.trace(msg)  # type: ignore
339
            return getattr(self, name)
340
        else:
341
            return self._data.get(name, default)
342
343
    @auto_load
344
    @auto_save
345
    def set(self, name, value):
346
        """Set an extended attribute.
347
348
        :param name: name of extended attribute
349
        :param value: value to set
350
351
        """
352
        if hasattr(self, name):
353
            cname = self.__class__.__name__
354
            msg = "'{n}' can be set from {c}.{n}".format(n=name, c=cname)
355
            log.trace(msg)  # type: ignore
356
            setattr(self, name, value)
357
        else:
358
            self._data[name] = value
359
360
    # actions ################################################################
361
362
    def delete(self, path):
363
        """Delete the object's file from the file system."""
364
        if self._exists:
365
            log.info("deleting {}...".format(path))
366
            common.delete(path)
367
            self._loaded = False  # force the object to reload
368
            self._exists = False  # but, prevent future access
369
        else:
370
            log.warning("already deleted: {}".format(self))
371