Passed
Pull Request — develop (#143)
by Jamie
03:24
created

GlobFormatter   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 28
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 8
c 2
b 0
f 0
dl 0
loc 28
ccs 10
cts 10
cp 1
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A get_field() 0 5 2
A get_value() 0 5 2
A format_field() 0 5 2
A convert_field() 0 5 2
1
"""Functions to interact with mapped classes and instances."""
2
3 1
import inspect
4 1
import logging
5
import string
6 1
import glob
7
import types
8 1
9
import parse
10
11 1
from . import common, exceptions
12
13
log = logging.getLogger(__name__)
14
15
16
def create(class_or_instance, *args, overwrite=False, **kwargs):
17 1
    """Create a new mapped object.
18 1
19
    NOTE: Calling this function is unnecessary with 'auto_create' enabled.
20 1
21 1
    """
22 1
    instance = _instantiate(class_or_instance, *args, **kwargs)
23
    mapper = common.get_mapper(instance, expected=True)
24 1
25
    if mapper.exists and not overwrite:
26
        msg = "{!r} already exists".format(mapper.path)
27 1
        raise exceptions.DuplicateMappingError(msg)
28
29 1
    return load(save(instance))
30 1
31
32 1
def find(class_or_instance, *args, create=False, **kwargs):  # pylint: disable=redefined-outer-name
33 1
    """Find a matching mapped object or return None."""
34 1
    instance = _instantiate(class_or_instance, *args, **kwargs)
35 1
    mapper = common.get_mapper(instance, expected=True)
36
37 1
    if mapper.exists:
38
        return instance
39
    elif create:
40 1
        return save(instance)
41
    else:
42 1
        return None
43 1
44
45
class GlobFormatter(string.Formatter):
46 1
    """Uses '*' for all unknown fields."""
47
48
    WILDCARD = object()
49
50
    def get_field(self, field_name, args, kwargs):
51
        try:
52
            return super().get_field(field_name, args, kwargs)
53 1
        except (KeyError, IndexError, AttributeError):
54
            return self.WILDCARD, None
55 1
56
    def get_value(self, key, args, kwargs):
57 1
        try:
58
            return super().get_value(key, args, kwargs)
59
        except (KeyError, IndexError, AttributeError):
60 1
            return self.WILDCARD
61
62
    def convert_field(self, value, conversion):
63
        if value is self.WILDCARD:
64
            return self.WILDCARD
65
        else:
66 1
            return super().convert_field(value, conversion)
67
68 1
    def format_field(self, value, format_spec):
69 1
        if value is self.WILDCARD:
70 1
            return '*'
71
        else:
72 1
            return super().format_field(value, format_spec)
73 1
74
75 1
def _unpack_parsed_fields(pathfields):
76
    return {
77 1
        (k[len('self.'):] if k.startswith('self.') else k): v
78
        for k, v in pathfields.items()
79
    }
80 1
81
82 1
def match(cls_or_path, _factory=None, **kwargs):
83
    """Yield all matching mapped objects.
84 1
85
    Can be used two ways:
86 1
    * With a YORM-decorated class, optionally with a factory callable
87
    * With a Python 3-style string template and a factory callable
88
89 1
    The factory callable must accept keyuword arguments, extracted from the file
90 1
    name merged with those passed to match(). If no factory is given, the class
91 1
    itself is used as the factory (same signature).
92
93 1
    Keyword arguments are used to filter objects. Filtering is only done by
94 1
    filename, so only fields that are part of the path_format can be filtered
95
    against.
96 1
    """
97
    if isinstance(cls_or_path, type):
98
        path_format = common.path_formats[cls_or_path]
99
        # Let KeyError fail through
100
        if _factory is None:
101
            _factory = cls_or_path
102
    else:
103
        path_format = cls_or_path
104
        if _factory is None:
105
            raise TypeError("Factory must be given if a path format is given")
106
107
    gf = GlobFormatter()
108
    mock = types.SimpleNamespace(**kwargs)
109
110
    kwargs['self'] = mock
111
    posix_pattern = gf.vformat(path_format, (), kwargs.copy())
112
    del kwargs['self']
113
    py_pattern = parse.compile(path_format)
114
115
    for filename in glob.iglob(posix_pattern):
116
        pathfields = py_pattern.parse(filename).named
117
        fields = _unpack_parsed_fields(pathfields)
118
        fields.update(kwargs)
119
        yield _factory(**fields)
120
121
122
def load(instance):
123
    """Force the loading of a mapped object's file.
124
125
    NOTE: Calling this function is unnecessary. It exists for the
126
        aesthetic purpose of having symmetry between save and load.
127
128
    """
129
    mapper = common.get_mapper(instance, expected=True)
130
131
    mapper.load()
132
133
    return instance
134
135
136
def save(instance):
137
    """Save a mapped object to file.
138
139
    NOTE: Calling this function is unnecessary with 'auto_save' enabled.
140
141
    """
142
    mapper = common.get_mapper(instance, expected=True)
143
144
    if mapper.deleted:
145
        msg = "{!r} was deleted".format(mapper.path)
146
        raise exceptions.DeletedFileError(msg)
147
148
    if not mapper.exists:
149
        mapper.create()
150
151
    mapper.save()
152
153
    return instance
154
155
156
def delete(instance):
157
    """Delete a mapped object's file."""
158
    mapper = common.get_mapper(instance, expected=True)
159
160
    mapper.delete()
161
162
    return None
163
164
165
def _instantiate(class_or_instance, *args, **kwargs):
166
    if inspect.isclass(class_or_instance):
167
        instance = class_or_instance(*args, **kwargs)
168
    else:
169
        assert not args
170
        instance = class_or_instance
171
172
    return instance
173