yorm.utilities._instantiate()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 3
dl 0
loc 8
ccs 0
cts 0
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
"""Functions to interact with mapped classes and instances."""
2
3 1
import inspect
4 1
import logging
5
import string
6 1
import glob
7
import types
8 1
9
import parse
10
11 1
from . import common, exceptions
12
13
log = logging.getLogger(__name__)
14
15
16
def create(class_or_instance, *args, overwrite=False, **kwargs):
17 1
    """Create a new mapped object.
18 1
19
    NOTE: Calling this function is unnecessary with 'auto_create' enabled.
20 1
21 1
    """
22 1
    instance = _instantiate(class_or_instance, *args, **kwargs)
23
    mapper = common.get_mapper(instance, expected=True)
24 1
25
    if mapper.exists and not overwrite:
26
        msg = "{!r} already exists".format(mapper.path)
27 1
        raise exceptions.DuplicateMappingError(msg)
28
29 1
    return load(save(instance))
30 1
31
32 1
def find(class_or_instance, *args, create=False, **kwargs):  # pylint: disable=redefined-outer-name
33 1
    """Find a matching mapped object or return None."""
34 1
    instance = _instantiate(class_or_instance, *args, **kwargs)
35 1
    mapper = common.get_mapper(instance, expected=True)
36
37 1
    if mapper.exists:
38
        return instance
39
    elif create:
40 1
        return save(instance)
41
    else:
42 1
        return None
43 1
44
45
class GlobFormatter(string.Formatter):
46 1
    """Uses '*' for all unknown fields."""
47
48
    WILDCARD = object()
49
50
    def get_field(self, field_name, args, kwargs):
51
        try:
52
            return super().get_field(field_name, args, kwargs)
53 1
        except (KeyError, IndexError, AttributeError):
54
            return self.WILDCARD, None
55 1
56
    def get_value(self, key, args, kwargs):
57 1
        try:
58
            return super().get_value(key, args, kwargs)
59
        except (KeyError, IndexError, AttributeError):
60 1
            return self.WILDCARD
61
62
    def convert_field(self, value, conversion):
63
        if value is self.WILDCARD:
64
            return self.WILDCARD
65
        else:
66 1
            return super().convert_field(value, conversion)
67
68 1
    def format_field(self, value, format_spec):
69 1
        if value is self.WILDCARD:
70 1
            return '*'
71
        else:
72 1
            return super().format_field(value, format_spec)
73 1
74
75 1
def _unpack_parsed_fields(pathfields):
76
    return {
77 1
        (k[len('self.'):] if k.startswith('self.') else k): v
78
        for k, v in pathfields.items()
79
    }
80 1
81
82 1
def match(cls_or_path, _factory=None, **kwargs):
83
    """Yield all matching mapped objects.
84 1
85
    Can be used two ways:
86 1
87
    * With a YORM-decorated class, optionally with a factory callable
88
    * With a Python 3-style string template and a factory callable
89 1
90 1
    The factory callable must accept keyword arguments, extracted from the file
91 1
    name merged with those passed to match(). If no factory is given, the class
92
    itself is used as the factory (same signature).
93 1
94 1
    Keyword arguments are used to filter objects. Filtering is only done by
95
    filename, so only fields that are part of the path_format can be filtered
96 1
    against.
97
98
    """
99
    if isinstance(cls_or_path, type):
100
        path_format = common.path_formats[cls_or_path]
101
        # Let KeyError fail through
102
        if _factory is None:
103
            _factory = cls_or_path
104
    else:
105
        path_format = cls_or_path
106
        if _factory is None:
107
            raise TypeError("Factory must be given if a path format is given")
108
109
    gf = GlobFormatter()
110
    mock = types.SimpleNamespace(**kwargs)
111
112
    kwargs['self'] = mock
113
    posix_pattern = gf.vformat(path_format, (), kwargs.copy())
114
    del kwargs['self']
115
    py_pattern = parse.compile(path_format)
0 ignored issues
show
Bug introduced by
The Module parse does not seem to have a member named compile.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
116
117
    for filename in glob.iglob(posix_pattern):
118
        pathfields = py_pattern.parse(filename).named
119
        fields = _unpack_parsed_fields(pathfields)
120
        fields.update(kwargs)
121
        yield _factory(**fields)
122
123
124
def load(instance):
125
    """Force the loading of a mapped object's file.
126
127
    NOTE: Calling this function is unnecessary. It exists for the
128
        aesthetic purpose of having symmetry between save and load.
129
130
    """
131
    mapper = common.get_mapper(instance, expected=True)
132
133
    mapper.load()
134
135
    return instance
136
137
138
def save(instance):
139
    """Save a mapped object to file.
140
141
    NOTE: Calling this function is unnecessary with 'auto_save' enabled.
142
143
    """
144
    mapper = common.get_mapper(instance, expected=True)
145
146
    if mapper.deleted:
147
        msg = "{!r} was deleted".format(mapper.path)
148
        raise exceptions.DeletedFileError(msg)
149
150
    if not mapper.exists:
151
        mapper.create()
152
153
    mapper.save()
154
155
    return instance
156
157
158
def delete(instance):
159
    """Delete a mapped object's file."""
160
    mapper = common.get_mapper(instance, expected=True)
161
162
    mapper.delete()
163
164
165
def _instantiate(class_or_instance, *args, **kwargs):
166
    if inspect.isclass(class_or_instance):
167
        instance = class_or_instance(*args, **kwargs)
168
    else:
169
        assert not args
170
        instance = class_or_instance
171
172
    return instance
173