_split_path()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 25
rs 7.8799
c 0
b 0
f 0
1
#
2
# Forked from m9dicts.{api,dicts}.
3
#
4
# Copyright (C) 2011 - 2015 Red Hat, Inc.
5
# Copyright (C) 2011 - 2017 Satoru SATOH <ssato redhat.com>
6
# License: MIT
7
#
8
r"""Utility functions to operate on mapping objects such as get, set and merge.
9
10
.. versionadded: 0.8.3
11
   define _update_* and merge functions based on classes in
12
   :mod:`m9dicts.dicts`
13
14
"""
15
from __future__ import absolute_import
16
import functools
17
import operator
18
import re
19
import anyconfig.utils
20
21
22
# Merge strategies:
23
MS_REPLACE = "replace"
24
MS_NO_REPLACE = "noreplace"
25
MS_DICTS = "merge_dicts"
26
MS_DICTS_AND_LISTS = "merge_dicts_and_lists"
27
MERGE_STRATEGIES = (MS_REPLACE, MS_NO_REPLACE, MS_DICTS, MS_DICTS_AND_LISTS)
28
29
PATH_SEPS = ('/', '.')
30
31
_JSNP_GET_ARRAY_IDX_REG = re.compile(r"(?:0|[1-9][0-9]*)")
32
_JSNP_SET_ARRAY_IDX = re.compile(r"(?:0|[1-9][0-9]*|-)")
33
34
35
def _jsnp_unescape(jsn_s):
36
    """
37
    Parse and decode given encoded JSON Pointer expression, convert ~1 to
38
    / and ~0 to ~.
39
40
    .. note:: JSON Pointer: http://tools.ietf.org/html/rfc6901
41
42
    >>> _jsnp_unescape("/a~1b")
43
    '/a/b'
44
    >>> _jsnp_unescape("~1aaa~1~0bbb")
45
    '/aaa/~bbb'
46
    """
47
    return jsn_s.replace('~1', '/').replace('~0', '~')
48
49
50
def _split_path(path, seps=PATH_SEPS):
51
    """
52
    Parse path expression and return list of path items.
53
54
    :param path: Path expression may contain separator chars.
55
    :param seps: Separator char candidates.
56
    :return: A list of keys to fetch object[s] later.
57
58
    >>> assert _split_path('') == []
59
    >>> assert _split_path('/') == ['']  # JSON Pointer spec expects this.
60
    >>> for p in ('/a', '.a', 'a', 'a.'):
61
    ...     assert _split_path(p) == ['a'], p
62
    >>> assert _split_path('/a/b/c') == _split_path('a.b.c') == ['a', 'b', 'c']
63
    >>> assert _split_path('abc') == ['abc']
64
    """
65
    if not path:
66
        return []
67
68
    for sep in seps:
69
        if sep in path:
70
            if path == sep:  # Special case, '/' or '.' only.
71
                return ['']
72
            return [x for x in path.split(sep) if x]
73
74
    return [path]
75
76
77
def mk_nested_dic(path, val, seps=PATH_SEPS):
78
    """
79
    Make a nested dict iteratively.
80
81
    :param path: Path expression to make a nested dict
82
    :param val: Value to set
83
    :param seps: Separator char candidates
84
85
    >>> mk_nested_dic("a.b.c", 1)
86
    {'a': {'b': {'c': 1}}}
87
    >>> mk_nested_dic("/a/b/c", 1)
88
    {'a': {'b': {'c': 1}}}
89
    """
90
    ret = None
91
    for key in reversed(_split_path(path, seps)):
92
        ret = {key: val if ret is None else ret.copy()}
93
94
    return ret
95
96
97
def get(dic, path, seps=PATH_SEPS, idx_reg=_JSNP_GET_ARRAY_IDX_REG):
98
    """getter for nested dicts.
99
100
    :param dic: a dict[-like] object
101
    :param path: Path expression to point object wanted
102
    :param seps: Separator char candidates
103
    :return: A tuple of (result_object, error_message)
104
105
    >>> d = {'a': {'b': {'c': 0, 'd': [1, 2]}}, '': 3}
106
    >>> assert get(d, '/') == (3, '')  # key becomes '' (empty string).
107
    >>> assert get(d, "/a/b/c") == (0, '')
108
    >>> sorted(get(d, "a.b")[0].items())
109
    [('c', 0), ('d', [1, 2])]
110
    >>> (get(d, "a.b.d"), get(d, "/a/b/d/1"))
111
    (([1, 2], ''), (2, ''))
112
    >>> get(d, "a.b.key_not_exist")  # doctest: +ELLIPSIS
113
    (None, "'...'")
114
    >>> get(d, "/a/b/d/2")
115
    (None, 'list index out of range')
116
    >>> get(d, "/a/b/d/-")  # doctest: +ELLIPSIS
117
    (None, 'list indices must be integers...')
118
    """
119
    items = [_jsnp_unescape(p) for p in _split_path(path, seps)]
120
    if not items:
121
        return (dic, '')
122
    try:
123
        if len(items) == 1:
124
            return (dic[items[0]], '')
125
126
        prnt = functools.reduce(operator.getitem, items[:-1], dic)
127
        arr = anyconfig.utils.is_list_like(prnt) and idx_reg.match(items[-1])
128
        return (prnt[int(items[-1])], '') if arr else (prnt[items[-1]], '')
129
130
    except (TypeError, KeyError, IndexError) as exc:
131
        return (None, str(exc))
132
133
134
def set_(dic, path, val, seps=PATH_SEPS):
135
    """setter for nested dicts.
136
137
    :param dic: a dict[-like] object support recursive merge operations
138
    :param path: Path expression to point object wanted
139
    :param seps: Separator char candidates
140
141
    >>> d = dict(a=1, b=dict(c=2, ))
142
    >>> set_(d, 'a.b.d', 3)
143
    >>> d['a']['b']['d']
144
    3
145
    """
146
    merge(dic, mk_nested_dic(path, val, seps), ac_merge=MS_DICTS)
147
148
149
def _are_list_like(*objs):
150
    """
151
    >>> _are_list_like([], (), [x for x in range(10)], (x for x in range(4)))
152
    True
153
    >>> _are_list_like([], {})
154
    False
155
    >>> _are_list_like([], "aaa")
156
    False
157
    """
158
    return all(anyconfig.utils.is_list_like(obj) for obj in objs)
159
160
161
def _update_with_replace(self, other, key, val=None, **options):
162
    """
163
    Replace value of a mapping object `self` with `other` has if both have same
164
    keys on update. Otherwise, just keep the value of `self`.
165
166
    :param self: mapping object to update with `other`
167
    :param other: mapping object to update `self`
168
    :param key: key of mapping object to update
169
    :param val: value to update self alternatively
170
171
    :return: None but `self` will be updated
172
    """
173
    self[key] = other[key] if val is None else val
174
175
176
def _update_wo_replace(self, other, key, val=None, **options):
177
    """
178
    Never update (replace) the value of `self` with `other`'s, that is, only
179
    the values `self` does not have its key will be added on update.
180
181
    :param self: mapping object to update with `other`
182
    :param other: mapping object to update `self`
183
    :param key: key of mapping object to update
184
    :param val: value to update self alternatively
185
186
    :return: None but `self` will be updated
187
    """
188
    if key not in self:
189
        _update_with_replace(self, other, key, val=val)
190
191
192
def _merge_list(self, key, lst):
193
    """
194
    :param key: self[key] will be updated
195
    :param lst: Other list to merge
196
    """
197
    self[key] += [x for x in lst if x not in self[key]]
198
199
200
def _merge_other(self, key, val):
201
    """
202
    :param key: self[key] will be updated
203
    :param val: Other val to merge (update/replace)
204
    """
205
    self[key] = val  # Just overwrite it by default implementation.
206
207
208
def _update_with_merge(self, other, key, val=None, merge_lists=False,
209
                       **options):
210
    """
211
    Merge the value of self with other's recursively. Behavior of merge will be
212
    vary depends on types of original and new values.
213
214
    - mapping vs. mapping -> merge recursively
215
    - list vs. list -> vary depends on `merge_lists`. see its description.
216
217
    :param other: a dict[-like] object or a list of (key, value) tuples
218
    :param key: key of mapping object to update
219
    :param val: value to update self[key]
220
    :param merge_lists:
221
        Merge not only dicts but also lists. For example,
222
223
        [1, 2, 3], [3, 4] ==> [1, 2, 3, 4]
224
        [1, 2, 2], [2, 4] ==> [1, 2, 2, 4]
225
226
    :return: None but `self` will be updated
227
    """
228
    if val is None:
229
        val = other[key]
230
231
    if key in self:
232
        val0 = self[key]  # Original value
233
        if anyconfig.utils.is_dict_like(val0):  # It needs recursive updates.
234
            merge(self[key], val, merge_lists=merge_lists, **options)
235
        elif merge_lists and _are_list_like(val, val0):
236
            _merge_list(self, key, val)
237
        else:
238
            _merge_other(self, key, val)
239
    else:
240
        self[key] = val
241
242
243
def _update_with_merge_lists(self, other, key, val=None, **options):
244
    """
245
    Similar to _update_with_merge but merge lists always.
246
247
    :param self: mapping object to update with `other`
248
    :param other: mapping object to update `self`
249
    :param key: key of mapping object to update
250
    :param val: value to update self alternatively
251
252
    :return: None but `self` will be updated
253
    """
254
    _update_with_merge(self, other, key, val=val, merge_lists=True, **options)
255
256
257
_MERGE_FNS = {MS_REPLACE: _update_with_replace,
258
              MS_NO_REPLACE: _update_wo_replace,
259
              MS_DICTS: _update_with_merge,
260
              MS_DICTS_AND_LISTS: _update_with_merge_lists}
261
262
263
def _get_update_fn(strategy):
264
    """
265
    Select dict-like class based on merge strategy and orderness of keys.
266
267
    :param merge: Specify strategy from MERGE_STRATEGIES of how to merge dicts.
268
    :return: Callable to update objects
269
    """
270
    if strategy is None:
271
        strategy = MS_DICTS
272
    try:
273
        return _MERGE_FNS[strategy]
274
    except KeyError:
275
        if callable(strategy):
276
            return strategy
277
        else:
278
            raise ValueError("Wrong merge strategy: %r" % strategy)
279
280
281
def merge(self, other, ac_merge=MS_DICTS, **options):
282
    """
283
    Update (merge) a mapping object `self` with other mapping object or an
284
    iterable yields (key, value) tuples based on merge strategy `ac_merge`.
285
286
    :param others: a list of dict[-like] objects or (key, value) tuples
287
    :param another: optional keyword arguments to update self more
288
    :param ac_merge: Merge strategy to choose
289
    """
290
    _update_fn = _get_update_fn(ac_merge)
291
292
    if hasattr(other, "keys"):
293
        for key in other:
294
            _update_fn(self, other, key, **options)
295
    else:
296
        try:
297
            for key, val in other:
298
                _update_fn(self, other, key, val=val, **options)
299
        except (ValueError, TypeError) as exc:  # Re-raise w/ info.
300
            raise type(exc)("%s other=%r" % (str(exc), other))
301
302
303
def _make_recur(obj, make_fn, ac_ordered=False, ac_dict=None, **options):
304
    """
305
    :param obj: A mapping objects or other primitive object
306
    :param make_fn: Function to make/convert to
307
    :param ac_ordered: Use OrderedDict instead of dict to keep order of items
308
    :param ac_dict: Callable to convert `obj` to mapping object
309
    :param options: Optional keyword arguments.
310
311
    :return: Mapping object
312
    """
313
    if ac_dict is None:
314
        ac_dict = anyconfig.compat.OrderedDict if ac_ordered else dict
315
316
    return ac_dict((k, None if v is None else make_fn(v, **options))
317
                   for k, v in obj.items())
318
319
320
def _make_iter(obj, make_fn, **options):
321
    """
322
    :param obj: A mapping objects or other primitive object
323
    :param make_fn: Function to make/convert to
324
    :param options: Optional keyword arguments.
325
326
    :return: Mapping object
327
    """
328
    return type(obj)(make_fn(v, **options) for v in obj)
329
330
331
def convert_to(obj, ac_ordered=False, ac_dict=None, **options):
332
    """
333
    Convert a mapping objects to a dict or object of `to_type` recursively.
334
    Borrowed basic idea and implementation from bunch.unbunchify. (bunch is
335
    distributed under MIT license same as this.)
336
337
    :param obj: A mapping objects or other primitive object
338
    :param ac_ordered: Use OrderedDict instead of dict to keep order of items
339
    :param ac_dict: Callable to convert `obj` to mapping object
340
    :param options: Optional keyword arguments.
341
342
    :return: A dict or OrderedDict or object of `cls`
343
344
    >>> OD = anyconfig.compat.OrderedDict
345
    >>> convert_to(OD((('a', 1) ,)), cls=dict)
346
    {'a': 1}
347
    >>> convert_to(OD((('a', OD((('b', OD((('c', 1), ))), ))), )), cls=dict)
348
    {'a': {'b': {'c': 1}}}
349
    """
350
    options.update(ac_ordered=ac_ordered, ac_dict=ac_dict)
351
    if anyconfig.utils.is_dict_like(obj):
352
        return _make_recur(obj, convert_to, **options)
353
    elif anyconfig.utils.is_list_like(obj):
354
        return _make_iter(obj, convert_to, **options)
355
356
    return obj
357
358
# vim:sw=4:ts=4:et:
359