1 | # SPDX-License-Identifier: LGPL-3.0-only |
||
2 | |||
3 | """Common classes and functions for the `doorstop.core` package.""" |
||
4 | |||
5 | import hashlib |
||
6 | import os |
||
7 | import re |
||
8 | from base64 import urlsafe_b64encode |
||
9 | from typing import Union |
||
10 | |||
11 | import yaml |
||
12 | |||
13 | from doorstop import common, settings |
||
14 | from doorstop.common import DoorstopError |
||
15 | |||
16 | log = common.logger(__name__) |
||
17 | |||
18 | |||
19 | class Prefix(str): |
||
20 | """Unique document prefixes.""" |
||
21 | |||
22 | UNKNOWN_MESSAGE = "no document with prefix: {}" |
||
23 | |||
24 | def __new__(cls, value=""): |
||
25 | if isinstance(value, Prefix): |
||
26 | return value |
||
27 | else: |
||
28 | if str(value).lower() in settings.RESERVED_WORDS: |
||
29 | raise DoorstopError("cannot use reserved word: %s" % value) |
||
30 | obj = super().__new__(cls, Prefix.load_prefix(value)) # type: ignore |
||
31 | return obj |
||
32 | |||
33 | def __repr__(self): |
||
34 | return "Prefix('{}')".format(self) |
||
35 | |||
36 | def __hash__(self): # pylint: disable=useless-super-delegation |
||
37 | return super().__hash__() |
||
38 | |||
39 | def __eq__(self, other): |
||
40 | if other in settings.RESERVED_WORDS: |
||
41 | return False |
||
42 | if not isinstance(other, Prefix): |
||
43 | other = Prefix(other) |
||
44 | return self.lower() == other.lower() |
||
45 | |||
46 | def __ne__(self, other): |
||
47 | return not self == other |
||
48 | |||
49 | def __lt__(self, other): |
||
50 | return self.lower() < other.lower() |
||
51 | |||
52 | @staticmethod |
||
53 | def load_prefix(value): |
||
54 | """Convert a value to a prefix. |
||
55 | |||
56 | >>> Prefix.load_prefix("abc 123") |
||
57 | 'abc' |
||
58 | """ |
||
59 | return str(value).split(' ')[0] if value else '' |
||
60 | |||
61 | |||
62 | class UID: |
||
63 | """Unique item ID built from document prefix and number.""" |
||
64 | |||
65 | UNKNOWN_MESSAGE = "no{k} item with UID: {u}" # k='parent'|'child', u=UID |
||
66 | |||
67 | def __new__(cls, *args, **kwargs): # pylint: disable=W0613 |
||
68 | if args and isinstance(args[0], UID): |
||
69 | return args[0] |
||
70 | else: |
||
71 | return super().__new__(cls) |
||
72 | |||
73 | def __init__(self, *values, stamp=None): |
||
74 | """Initialize an UID using a string, dictionary, or set of parts. |
||
75 | |||
76 | Option 1: |
||
77 | |||
78 | :param *values: UID + optional stamp ("UID:stamp") |
||
79 | :param stamp: stamp of :class:`~doorstop.core.item.Item` (if known) |
||
80 | |||
81 | Option 2: |
||
82 | |||
83 | :param *values: {UID: stamp} |
||
84 | :param stamp: stamp of :class:`~doorstop.core.item.Item` (if known) |
||
85 | |||
86 | Option 3: |
||
87 | |||
88 | :param *values: prefix, separator, number, digit count |
||
89 | param stamp: stamp of :class:`~doorstop.core.item.Item` (if known) |
||
90 | |||
91 | Option 4: |
||
92 | |||
93 | :param *values: prefix, separator, name |
||
94 | param stamp: stamp of :class:`~doorstop.core.item.Item` (if known) |
||
95 | |||
96 | """ |
||
97 | if values and isinstance(values[0], UID): |
||
98 | self.stamp: Stamp = stamp or values[0].stamp |
||
99 | return |
||
100 | self.stamp = stamp or Stamp() |
||
101 | # Join values |
||
102 | if len(values) == 0: # pylint: disable=len-as-condition |
||
103 | self.value = '' |
||
104 | elif len(values) == 1: |
||
105 | value = values[0] |
||
106 | if isinstance(value, str) and ':' in value: |
||
107 | # split UID:stamp into a dictionary |
||
108 | pair = value.rsplit(':', 1) |
||
109 | value = {pair[0]: pair[1]} |
||
110 | if isinstance(value, dict): |
||
111 | first = list(value.items())[0] |
||
112 | self.value = str(first[0]) |
||
113 | self.stamp = self.stamp or Stamp(first[1]) |
||
114 | else: |
||
115 | self.value = str(value) if values[0] else '' |
||
116 | elif len(values) == 4: |
||
117 | # pylint: disable=no-value-for-parameter |
||
118 | self.value = UID.join_uid_4(*values) |
||
119 | elif len(values) == 3: |
||
120 | # pylint: disable=no-value-for-parameter |
||
121 | self.value = UID.join_uid_3(*values) |
||
122 | else: |
||
123 | raise TypeError("__init__() takes 1, 3, or 4 positional arguments") |
||
124 | # Split values |
||
125 | self._prefix, self._number, self._name, self._exc = UID.split_uid(self.value) |
||
126 | |||
127 | def __repr__(self): |
||
128 | if self.stamp: |
||
129 | return "UID('{}', stamp='{}')".format(self.value, self.stamp) |
||
130 | else: |
||
131 | return "UID('{}')".format(self.value) |
||
132 | |||
133 | def __str__(self): |
||
134 | return self.value |
||
135 | |||
136 | def __hash__(self): |
||
137 | return hash((self._prefix, self._number, self._name)) |
||
138 | |||
139 | def __eq__(self, other): |
||
140 | if not other: |
||
141 | return False |
||
142 | if not isinstance(other, UID): |
||
143 | other = UID(other) |
||
144 | try: |
||
145 | self.check() |
||
146 | other.check() |
||
147 | # pylint: disable=protected-access |
||
148 | return ( |
||
149 | self._prefix == other._prefix |
||
150 | and self._number == other._number |
||
151 | and self._name == other._name |
||
152 | ) |
||
153 | except DoorstopError: |
||
154 | return self.value.lower() == other.value.lower() |
||
155 | |||
156 | def __ne__(self, other): |
||
157 | return not self == other |
||
158 | |||
159 | def __lt__(self, other): |
||
160 | try: |
||
161 | self.check() |
||
162 | other.check() |
||
163 | # pylint: disable=protected-access |
||
164 | if self._prefix == other._prefix: |
||
165 | if self._number == other._number: |
||
166 | return self._name < other._name |
||
167 | else: |
||
168 | return self._number < other._number |
||
169 | else: |
||
170 | return self._prefix < other._prefix |
||
171 | except DoorstopError: |
||
172 | return self.value < other.value |
||
173 | |||
174 | @property |
||
175 | def prefix(self): |
||
176 | """Get the UID's prefix.""" |
||
177 | self.check() |
||
178 | return self._prefix |
||
179 | |||
180 | @property |
||
181 | def number(self): |
||
182 | """Get the UID's number.""" |
||
183 | self.check() |
||
184 | return self._number |
||
185 | |||
186 | @property |
||
187 | def name(self): |
||
188 | """Get the UID's name.""" |
||
189 | self.check() |
||
190 | return self._name |
||
191 | |||
192 | @property |
||
193 | def string(self): |
||
194 | """Convert the UID and stamp to a single string.""" |
||
195 | if self.stamp: |
||
196 | return "{}:{}".format(self.value, self.stamp) |
||
197 | else: |
||
198 | return "{}".format(self.value) |
||
199 | |||
200 | def check(self): |
||
201 | """Verify an UID is valid.""" |
||
202 | if self._exc: |
||
203 | raise self._exc # pylint: disable=raising-bad-type |
||
204 | |||
205 | @staticmethod |
||
206 | def split_uid(value): |
||
207 | """Split an item's UID string into a prefix, number, name, and exception. |
||
208 | |||
209 | >>> UID.split_uid('ABC00123') |
||
210 | (Prefix('ABC'), 123, '', None) |
||
211 | |||
212 | >>> UID.split_uid('ABC.HLR_01-00123') |
||
213 | (Prefix('ABC.HLR_01'), 123, '', None) |
||
214 | |||
215 | >>> UID.split_uid('REQ2-001') |
||
216 | (Prefix('REQ2'), 1, '', None) |
||
217 | |||
218 | >>> UID.split_uid('REQ2-NAME') |
||
219 | (Prefix('REQ2'), -1, 'NAME', None) |
||
220 | |||
221 | >>> UID.split_uid('REQ2-NAME007') |
||
222 | (Prefix('REQ2'), -1, 'NAME007', None) |
||
223 | |||
224 | >>> UID.split_uid('REQ2-123NAME') |
||
225 | (Prefix('REQ2'), -1, '123NAME', None) |
||
226 | |||
227 | """ |
||
228 | m = re.match("([\\w.-]+)[" + settings.SEP_CHARS + "](\\w+)", value) |
||
229 | if m: |
||
230 | try: |
||
231 | num = int(m.group(2)) |
||
232 | return Prefix(m.group(1)), num, '', None |
||
233 | except ValueError: |
||
234 | return Prefix(m.group(1)), -1, m.group(2), None |
||
235 | m = re.match(r"([\w.-]*\D)(\d+)", value) |
||
236 | if m: |
||
237 | num = m.group(2) |
||
238 | return Prefix(m.group(1).rstrip(settings.SEP_CHARS)), int(num), '', None |
||
239 | return None, None, None, DoorstopError("invalid UID: {}".format(value)) |
||
240 | |||
241 | @staticmethod |
||
242 | def join_uid_4(prefix, sep, number, digits): |
||
243 | """Join the four parts of an item's UID into a string. |
||
244 | |||
245 | >>> UID.join_uid_4('ABC', '', 123, 5) |
||
246 | 'ABC00123' |
||
247 | |||
248 | >>> UID.join_uid_4('REQ.H', '-', 42, 4) |
||
249 | 'REQ.H-0042' |
||
250 | |||
251 | >>> UID.join_uid_4('ABC', '-', 123, 0) |
||
252 | 'ABC-123' |
||
253 | |||
254 | """ |
||
255 | return "{}{}{}".format(prefix, sep, str(number).zfill(digits)) |
||
256 | |||
257 | @staticmethod |
||
258 | def join_uid_3(prefix, sep, name): |
||
259 | """Join the three parts of an item's UID into a string.""" |
||
260 | return "{}{}{}".format(prefix, sep, name) |
||
261 | |||
262 | |||
263 | class _Literal(str): |
||
264 | """Custom type for text which should be dumped in the literal style.""" |
||
265 | |||
266 | @staticmethod |
||
267 | def representer(dumper, data): |
||
268 | """Return a custom dumper that formats str in the literal style.""" |
||
269 | return dumper.represent_scalar( |
||
270 | 'tag:yaml.org,2002:str', data, style='|' if data else '' |
||
271 | ) |
||
272 | |||
273 | |||
274 | yaml.add_representer(_Literal, _Literal.representer) |
||
275 | |||
276 | |||
277 | class Text(str): |
||
278 | """Markdown text paragraph.""" |
||
279 | |||
280 | def __new__(cls, value=""): |
||
281 | assert not isinstance(value, Text) |
||
282 | obj = super(Text, cls).__new__(cls, Text.load_text(value)) # type: ignore |
||
283 | return obj |
||
284 | |||
285 | @property |
||
286 | def yaml(self): |
||
287 | """Get the value to be used in YAML dumping.""" |
||
288 | return Text.save_text(self) |
||
289 | |||
290 | @staticmethod |
||
291 | def load_text(value): |
||
292 | r"""Convert dumped text to the original string. |
||
293 | |||
294 | >>> Text.load_text("abc\ndef") |
||
295 | 'abc\ndef' |
||
296 | |||
297 | >>> Text.load_text("list:\n\n- a\n- b\n") |
||
298 | 'list:\n\n- a\n- b' |
||
299 | |||
300 | """ |
||
301 | if not value: |
||
302 | return "" |
||
303 | text_value = re.sub('^\n+', '', value) |
||
304 | text_value = re.sub('\n+$', '', text_value) |
||
305 | text_value = '\n'.join([s.rstrip() for s in text_value.split('\n')]) |
||
306 | return text_value |
||
307 | |||
308 | @staticmethod |
||
309 | def save_text(text, end='\n'): |
||
310 | """Break a string at sentences and dump as wrapped literal YAML.""" |
||
311 | if text: |
||
312 | return _Literal(text + end) |
||
313 | else: |
||
314 | return '' |
||
315 | |||
316 | |||
317 | class Level: |
||
318 | """Variable-length numerical outline level values. |
||
319 | |||
320 | Level values cannot contain zeros. Zeros are reserved for |
||
321 | identifying "heading" levels when written to file. |
||
322 | """ |
||
323 | |||
324 | def __init__(self, value=None, heading=None): |
||
325 | """Initialize an item level from a sequence of numbers. |
||
326 | |||
327 | :param value: sequence of int, float, or period-delimited string |
||
328 | :param heading: force a heading value (or inferred from trailing zero) |
||
329 | |||
330 | """ |
||
331 | if isinstance(value, Level): |
||
332 | self._parts = list(value) |
||
333 | self.heading: bool = value.heading |
||
334 | else: |
||
335 | parts = self.load_level(value) |
||
336 | if parts and parts[-1] == 0: |
||
337 | self._parts = parts[:-1] |
||
338 | self.heading = True |
||
339 | else: |
||
340 | self._parts = parts |
||
341 | self.heading = False |
||
342 | self.heading = self.heading if heading is None else heading |
||
343 | if not value: |
||
344 | self._adjust() |
||
345 | |||
346 | def __repr__(self): |
||
347 | if self.heading: |
||
348 | level = '.'.join(str(n) for n in self._parts) |
||
349 | return "Level('{}', heading=True)".format(level) |
||
350 | else: |
||
351 | return "Level('{}')".format(str(self)) |
||
352 | |||
353 | def __str__(self): |
||
354 | return '.'.join(str(n) for n in self.value) |
||
355 | |||
356 | def __iter__(self): |
||
357 | return iter(self._parts) |
||
358 | |||
359 | def __len__(self): |
||
360 | return len(self._parts) |
||
361 | |||
362 | def __eq__(self, other): |
||
363 | if other: |
||
364 | parts = list(other) |
||
365 | if parts and not parts[-1]: |
||
366 | parts.pop(-1) |
||
367 | return self._parts == parts |
||
368 | else: |
||
369 | return False |
||
370 | |||
371 | def __ne__(self, other): |
||
372 | return not self == other |
||
373 | |||
374 | def __lt__(self, other): |
||
375 | return self._parts < list(other) |
||
376 | |||
377 | def __gt__(self, other): |
||
378 | return self._parts > list(other) |
||
379 | |||
380 | def __le__(self, other): |
||
381 | return self._parts <= list(other) |
||
382 | |||
383 | def __ge__(self, other): |
||
384 | return self._parts >= list(other) |
||
385 | |||
386 | def __hash__(self): |
||
387 | return hash(self.value) |
||
388 | |||
389 | def __add__(self, value): |
||
390 | parts = list(self._parts) |
||
391 | parts[-1] += value |
||
392 | return Level(parts, heading=self.heading) |
||
393 | |||
394 | def __iadd__(self, value): |
||
395 | self._parts[-1] += value |
||
396 | self._adjust() |
||
397 | return self |
||
398 | |||
399 | def __sub__(self, value): |
||
400 | parts = list(self._parts) |
||
401 | parts[-1] -= value |
||
402 | return Level(parts, heading=self.heading) |
||
403 | |||
404 | def __isub__(self, value): |
||
405 | self._parts[-1] -= value |
||
406 | self._adjust() |
||
407 | return self |
||
408 | |||
409 | def __rshift__(self, value): |
||
410 | if value > 0: |
||
411 | parts = list(self._parts) + [1] * value |
||
412 | return Level(parts, heading=self.heading) |
||
413 | else: |
||
414 | return self.__lshift__(abs(value)) |
||
415 | |||
416 | def __irshift__(self, value): |
||
417 | if value > 0: |
||
418 | self._parts += [1] * value |
||
419 | self._adjust() |
||
420 | return self |
||
421 | else: |
||
422 | return self.__ilshift__(abs(value)) |
||
423 | |||
424 | def __lshift__(self, value): |
||
425 | if value >= 0: |
||
426 | parts = list(self._parts) |
||
427 | if value: |
||
428 | parts = parts[:-value] |
||
429 | return Level(parts, heading=self.heading) |
||
430 | else: |
||
431 | return self.__rshift__(abs(value)) |
||
432 | |||
433 | def __ilshift__(self, value): |
||
434 | if value >= 0: |
||
435 | if value: |
||
436 | self._parts = self._parts[:-value] |
||
437 | self._adjust() |
||
438 | return self |
||
439 | else: |
||
440 | return self.__irshift__(abs(value)) |
||
441 | |||
442 | @property |
||
443 | def value(self): |
||
444 | """Get a tuple for the level's value with heading indications.""" |
||
445 | parts = self._parts + ([0] if self.heading else []) |
||
446 | return tuple(parts) |
||
447 | |||
448 | @property |
||
449 | def yaml(self): |
||
450 | """Get the value to be used in YAML dumping.""" |
||
451 | return self.save_level(self.value) |
||
452 | |||
453 | def _adjust(self): |
||
454 | """Force all non-zero values.""" |
||
455 | old = self |
||
456 | new = None |
||
457 | if not self._parts: |
||
458 | new = Level(1) |
||
459 | elif 0 in self._parts: |
||
460 | new = Level(1 if not n else n for n in self._parts) |
||
461 | if new: |
||
462 | msg = "minimum level reached, reseting: {} -> {}".format(old, new) |
||
463 | log.warning(msg) |
||
464 | self._parts = list(new.value) |
||
465 | |||
466 | @staticmethod |
||
467 | def load_level(value): |
||
468 | """Convert an iterable, number, or level string to a tuple. |
||
469 | |||
470 | >>> Level.load_level("1.2.3") |
||
471 | [1, 2, 3] |
||
472 | |||
473 | >>> Level.load_level(['4', '5']) |
||
474 | [4, 5] |
||
475 | |||
476 | >>> Level.load_level(4.2) |
||
477 | [4, 2] |
||
478 | |||
479 | >>> Level.load_level([7, 0, 0]) |
||
480 | [7, 0] |
||
481 | |||
482 | >>> Level.load_level(1) |
||
483 | [1] |
||
484 | |||
485 | """ |
||
486 | # Correct for default values |
||
487 | if not value: |
||
488 | value = 1 |
||
489 | # Correct for integers (e.g. 42) and floats (e.g. 4.2) in YAML |
||
490 | if isinstance(value, (int, float)): |
||
491 | value = str(value) |
||
492 | |||
493 | # Split strings by periods |
||
494 | if isinstance(value, str): |
||
495 | nums = value.split('.') |
||
496 | else: # assume an iterable |
||
497 | nums = value |
||
498 | |||
499 | # Clean up multiple trailing zeros |
||
500 | parts = [int(n) for n in nums] |
||
501 | if parts and parts[-1] == 0: |
||
502 | while parts and parts[-1] == 0: |
||
503 | del parts[-1] |
||
504 | parts.append(0) |
||
505 | |||
506 | return parts |
||
507 | |||
508 | @staticmethod |
||
509 | def save_level(parts) -> Union[int, float, str]: |
||
510 | """Convert a level's part into non-quoted YAML value. |
||
511 | |||
512 | >>> Level.save_level((1,)) |
||
513 | 1 |
||
514 | |||
515 | >>> Level.save_level((1,0)) |
||
516 | 1.0 |
||
517 | |||
518 | >>> Level.save_level((1,0,0)) |
||
519 | '1.0.0' |
||
520 | |||
521 | """ |
||
522 | # Join the level's parts |
||
523 | level = '.'.join(str(n) for n in parts) |
||
524 | |||
525 | # Convert formats to cleaner YAML formats |
||
526 | if len(parts) == 1: |
||
527 | return int(level) |
||
528 | elif len(parts) == 2 and not (level.endswith('0') and parts[-1]): |
||
529 | return float(level) |
||
530 | |||
531 | return level |
||
532 | |||
533 | def copy(self): |
||
534 | """Return a copy of the level.""" |
||
535 | return Level(self.value) |
||
536 | |||
537 | |||
538 | class Stamp: |
||
539 | """Hashed content for change tracking. |
||
540 | |||
541 | :param values: one of the following: |
||
542 | |||
543 | - objects to hash as strings |
||
544 | - existing string stamp |
||
545 | - `True` - manually-confirmed matching hash, to be replaced later |
||
546 | - `False` | `None` | (nothing) - manually-confirmed mismatching hash |
||
547 | |||
548 | """ |
||
549 | |||
550 | def __init__(self, *values): |
||
551 | if not values: |
||
552 | self.value = None |
||
553 | return |
||
554 | if len(values) == 1: |
||
555 | value = values[0] |
||
556 | if to_bool(value): |
||
557 | self.value = True |
||
558 | return |
||
559 | if not value: |
||
560 | self.value = None |
||
561 | return |
||
562 | if isinstance(value, str): |
||
563 | self.value = value |
||
564 | return |
||
565 | self.value = self.digest(*values) |
||
566 | |||
567 | def __repr__(self): |
||
568 | return "Stamp({})".format(repr(self.value)) |
||
569 | |||
570 | def __str__(self): |
||
571 | if isinstance(self.value, str): |
||
572 | return self.value |
||
573 | else: |
||
574 | return '' |
||
575 | |||
576 | def __bool__(self): |
||
577 | return bool(self.value) |
||
578 | |||
579 | def __eq__(self, other): |
||
580 | return self.value == other |
||
581 | |||
582 | def __ne__(self, other): |
||
583 | return not self == other |
||
584 | |||
585 | @property |
||
586 | def yaml(self): |
||
587 | """Get the value to be used in YAML dumping.""" |
||
588 | return self.value |
||
589 | |||
590 | @staticmethod |
||
591 | def digest(*values) -> str: |
||
592 | """Hash the values for later comparison.""" |
||
593 | hsh = hashlib.sha256() |
||
594 | for value in values: |
||
595 | hsh.update(str(value).encode()) |
||
596 | return urlsafe_b64encode(hsh.digest()).decode('utf-8') |
||
597 | |||
598 | |||
599 | def to_bool(obj): |
||
600 | """Convert a boolean-like object. |
||
601 | |||
602 | >>> to_bool(1) |
||
603 | True |
||
604 | |||
605 | >>> to_bool(0) |
||
606 | False |
||
607 | |||
608 | >>> to_bool(' True ') |
||
609 | True |
||
610 | |||
611 | >>> to_bool('F') |
||
612 | False |
||
613 | |||
614 | """ |
||
615 | if isinstance(obj, str): |
||
616 | return obj.lower().strip() in ('yes', 'true', 'enabled', '1') |
||
617 | else: |
||
618 | return bool(obj) |
||
619 | |||
620 | |||
621 | def is_tree(obj): |
||
622 | """Determine if the object is a tree-like.""" |
||
623 | return hasattr(obj, 'documents') |
||
624 | |||
625 | |||
626 | def is_document(obj): |
||
627 | """Determine if the object is a document-like.""" |
||
628 | return hasattr(obj, 'items') |
||
629 | |||
630 | |||
631 | def is_item(obj): |
||
632 | """Determine if the object is item-like.""" |
||
633 | return hasattr(obj, 'text') |
||
634 | |||
635 | |||
636 | def iter_documents(obj, path, ext): |
||
637 | """Get an iterator if documents from a tree or document-like object.""" |
||
638 | if is_tree(obj): |
||
639 | # a tree |
||
640 | log.debug("iterating over tree...") |
||
641 | for document in obj.documents: |
||
642 | path2 = os.path.join(path, document.prefix + ext) |
||
643 | yield document, path2 |
||
644 | else: |
||
645 | # assume a document-like object |
||
646 | log.debug("iterating over document-like object...") |
||
647 | yield obj, path |
||
648 | |||
649 | |||
650 | def iter_items(obj): |
||
651 | """Get an iterator of items from from an item, list, or document.""" |
||
652 | if is_document(obj): |
||
653 | # a document |
||
654 | log.debug("iterating over document...") |
||
655 | return (i for i in obj.items) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
656 | try: |
||
657 | # an iterable (of items) |
||
658 | log.debug("iterating over document-like object...") |
||
659 | return iter(obj) |
||
660 | except TypeError: |
||
661 | # assume an item |
||
662 | log.debug("iterating over an item (in a container)...") |
||
663 | return [obj] |
||
664 |