Completed
Push — master ( 6e4ff3...7ef454 )
by Satoru
59s
created

anyconfig.create_from()   D

Complexity

Conditions 8

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 30
rs 4
1
#
2
# Copyright (C) 2011 - 2015 Red Hat, Inc.
3
# Copyright (C) 2011 - 2013 Satoru SATOH <ssato redhat.com>
4
# License: MIT
5
#
6
"""Dict-based object supports merge operations.
7
8
.. versionchanged: 0.4.99
9
   Support to convert namedtuple objects from/to dicts recursively.
10
11
.. versionadded: 0.3.1
12
   Added naive and partial implementation of JSON Pointer support.
13
14
.. note:: JSON Pointer: http://tools.ietf.org/html/rfc6901
15
"""
16
from __future__ import absolute_import
17
18
import collections
19
import functools
20
import operator
21
import re
22
23
from .compat import iteritems, UserDict, OrderedDict
24
from anyconfig.utils import is_iterable
25
26
27
MS_REPLACE = "replace"
28
MS_NO_REPLACE = "noreplace"
29
MS_DICTS = "merge_dicts"
30
MS_DICTS_AND_LISTS = "merge_dicts_and_lists"
31
32
MERGE_STRATEGIES = (MS_REPLACE, MS_NO_REPLACE, MS_DICTS, MS_DICTS_AND_LISTS)
33
34
PATH_SEPS = ('/', '.')
35
36
_JSNP_GET_ARRAY_IDX_REG = re.compile(r"(?:0|[1-9][0-9]*)")
37
_JSNP_SET_ARRAY_IDX = re.compile(r"(?:0|[1-9][0-9]*|-)")
38
39
NAMEDTUPLE_CLS_KEY = "_namedtuple_cls_"
40
41
42
def jsnp_unescape(jsn_s):
43
    """
44
    Parse and decode given encoded JSON Pointer expression, convert ~1 to
45
    / and ~0 to ~.
46
47
    >>> jsnp_unescape("/a~1b")
48
    '/a/b'
49
    >>> jsnp_unescape("~1aaa~1~0bbb")
50
    '/aaa/~bbb'
51
    """
52
    return jsn_s.replace('~1', '/').replace('~0', '~')
53
54
55
def parse_path(path, seps=PATH_SEPS):
56
    """
57
    Parse path expression and return list of path items.
58
59
    :param path: Path expression may contain separator chars.
60
    :param seps: Separator char candidates.
61
62
    :return: A list of keys to fetch object[s] later.
63
64
    >>> parse_path('')
65
    []
66
    >>> parse_path('/')  # JSON Pointer spec expects this behavior.
67
    ['']
68
    >>> parse_path('/a') == parse_path('.a') == ['a']
69
    True
70
    >>> parse_path('a') == parse_path('a.') == ['a']
71
    True
72
    >>> parse_path('/a/b/c') == parse_path('a.b.c') == ['a', 'b', 'c']
73
    True
74
    >>> parse_path('abc')
75
    ['abc']
76
    """
77
    if not path:
78
        return []
79
80
    for sep in seps:
81
        if sep in path:
82
            if path == sep:  # Special case, '/' or '.' only.
83
                return ['']
84
            return [x for x in path.split(sep) if x]
85
86
    return [path]
87
88
89
def get(dic, path, seps=PATH_SEPS):
90
    """
91
    getter for nested dicts.
92
93
    .. versionchanged: 0.3.1
94
       Added naive and partial implementation of JSON Pointer support
95
96
    :param dic: Dict or dict-like object
97
    :param path: Path expression to point object wanted
98
    :param seps: Separator char candidates
99
100
    :return: A tuple of (result_object, error_message)
101
102
    >>> d = {'a': {'b': {'c': 0, 'd': [1, 2]}}, '': 3}
103
    >>> get(d, '/')  # key becomes '' (empty string).
104
    (3, '')
105
    >>> get(d, "/a/b/c")
106
    (0, '')
107
    >>> sorted(get(d, "a.b")[0].items())
108
    [('c', 0), ('d', [1, 2])]
109
    >>> (get(d, "a.b.d"), get(d, "/a/b/d/1"))
110
    (([1, 2], ''), (2, ''))
111
    >>> get(d, "a.b.key_not_exist")  # doctest: +ELLIPSIS
112
    (None, "'...'")
113
    >>> get(d, "/a/b/d/2")
114
    (None, 'list index out of range')
115
    >>> get(d, "/a/b/d/-")  # doctest: +ELLIPSIS
116
    (None, 'list indices must be integers...')
117
    """
118
    items = [jsnp_unescape(p) for p in parse_path(path, seps)]
119
    if not items:
120
        return (dic, '')
121
    try:
122
        if len(items) == 1:
123
            return (dic[items[0]], '')
124
125
        parent = functools.reduce(operator.getitem, items[:-1], dic)
126
127
        if is_iterable(parent) and _JSNP_GET_ARRAY_IDX_REG.match(items[-1]):
128
            return (parent[int(items[-1])], '')
129
        else:
130
            return (parent[items[-1]], '')
131
132
    except (TypeError, KeyError, IndexError) as exc:
133
        return (None, str(exc))
134
135
136
def mk_nested_dic(path, val, seps=PATH_SEPS):
137
    """
138
    Make a nested dict iteratively.
139
140
    :param path: Path expression to make a nested dict
141
    :param val: Value to set
142
    :param seps: Separator char candidates
143
144
    >>> mk_nested_dic("a.b.c", 1)
145
    {'a': {'b': {'c': 1}}}
146
    >>> mk_nested_dic("/a/b/c", 1)
147
    {'a': {'b': {'c': 1}}}
148
    """
149
    ret = None
150
    for key in reversed(parse_path(path, seps)):
151
        ret = {key: val if ret is None else ret.copy()}
152
153
    return ret
154
155
156
def set_(dic, path, val, seps=PATH_SEPS, strategy=None):
157
    """
158
    setter for nested dicts.
159
160
    :param dic: MergeableDict instance or other dict-like objects support
161
        recursive merge operations.
162
    :param path: Path expression to point object wanted
163
    :param seps: Separator char candidates.
164
165
    >>> d = MergeableDict.create(dict(a=1, b=dict(c=2, )))
166
    >>> set_(d, 'a.b.d', 3)
167
    >>> d['a']['b']['d']
168
    3
169
    """
170
    diff = mk_nested_dic(path, val, seps)
171
    dic.update(diff, strategy)
172
173
174
def is_dict_like(obj):
175
    """
176
    :param obj: Any object may be an instance of MergeableDict or dict.
177
178
    >>> is_dict_like("a string")
179
    False
180
    >>> is_dict_like({})
181
    True
182
    >>> is_dict_like(create_from({}))
183
    True
184
    """
185
    return isinstance(obj, (dict, collections.Mapping, UserDict))
186
187
188
def is_namedtuple(obj):
189
    """
190
    >>> p0 = collections.namedtuple("Point", "x y")(1, 2)
191
    >>> is_namedtuple(p0)
192
    True
193
    >>> is_namedtuple(tuple(p0))
194
    False
195
    """
196
    return isinstance(obj, tuple) and hasattr(obj, "_asdict")
197
198
199
def convert_to(obj, to_namedtuple=False,
200
               _namedtuple_cls_key=NAMEDTUPLE_CLS_KEY):
201
    """
202
    Convert a OrderedMergeableDict or MergeableDict instances to a dict or
203
    namedtuple object recursively.
204
205
    Borrowed basic idea and implementation from bunch.unbunchify.
206
    (bunch is distributed under MIT license same as this module.)
207
208
    .. note::
209
       If `to_namedtuple` is True and given object `obj` is MergeableDict and
210
       not OrderedMergeableDict, then the order of fields of result namedtuple
211
       object may be random and not stable because the order of MergeableDict
212
       may not be kept and stable.
213
214
    .. note::
215
       namedtuple object cannot have fields start with '_' because it may be
216
       conflicts with special methods of object like '__class__'. So it'll fail
217
       if to convert dicts has such keys.
218
219
    :param obj: An [Ordered]MergeableDict instance or other object
220
    :param to_namedtuple:
221
        Convert `obj` to namedtuple object of which definition is created on
222
        the fly if True instead of dict.
223
    :param _namedtuple_cls_key:
224
        Special keyword to embedded the class name of namedtuple object to
225
        create.  See the comments in :func:`create_from` also.
226
227
    :return: A dict or namedtuple object if to_namedtuple is True
228
    """
229
    if is_dict_like(obj):
230
        if to_namedtuple:
231
            _name = obj.get(_namedtuple_cls_key, "NamedTuple")
232
            _keys = [k for k in obj.keys() if k != _namedtuple_cls_key]
233
            _vals = [convert_to(obj[k], to_namedtuple, _namedtuple_cls_key)
234
                     for k in _keys]
235
            return collections.namedtuple(_name, _keys)(*_vals)
236
        else:
237
            return dict((k, convert_to(v)) for k, v in iteritems(obj))
238
    elif is_namedtuple(obj):
239
        if to_namedtuple:
240
            return obj  # Nothing to do if it's nested n.t. (it should be).
241
        else:
242
            return dict((k, convert_to(getattr(obj, k))) for k in obj._fields)
243
    elif is_iterable(obj):
244
        return type(obj)(convert_to(v, to_namedtuple, _namedtuple_cls_key)
245
                         for v in obj)
246
    else:
247
        return obj
248
249
250
def create_from(obj, ac_ordered=False, _namedtuple_cls_key=NAMEDTUPLE_CLS_KEY):
251
    """
252
    Try creating a MergeableDict instance[s] from a dict or any other objects.
253
254
    :param obj: A dict instance
255
    :param ac_ordered:
256
        Create an instance of OrderedMergeableDict instead of MergeableDict If
257
        it's True. Please note that OrderedMergeableDict class will be chosen
258
        for namedtuple objects regardless of this argument always to keep keys
259
        (fields) order.
260
    :param _namedtuple_cls_key:
261
        Special keyword to embedded the class name of namedtuple object to the
262
        MergeableDict object created. It's a hack and not elegant but I don't
263
        think there are another ways to make same namedtuple object from the
264
        MergeableDict object created from it.
265
    """
266
    opts = dict(ac_ordered=ac_ordered, _namedtuple_cls_key=_namedtuple_cls_key)
267
268
    if is_dict_like(obj):
269
        cls = OrderedMergeableDict if ac_ordered else MergeableDict
270
        return cls((k, create_from(v, **opts)) for k, v in iteritems(obj))
271
    elif is_namedtuple(obj):
272
        mdict = OrderedMergeableDict((k, create_from(getattr(obj, k), **opts))
273
                                     for k in obj._fields)
274
        mdict[_namedtuple_cls_key] = obj.__class__.__name__
275
        return mdict
276
    elif is_iterable(obj):
277
        return type(obj)(create_from(v, **opts) for v in obj)
278
    else:
279
        return obj
280
281
282
class MergeableDict(dict):
283
    """
284
    Dict based object supports 'merge' operation.
285
    """
286
287
    strategy = MS_DICTS
288
289
    @classmethod
290
    def create(cls, obj):
291
        """Create an instance from any object."""
292
        return create_from(obj)
293
294
    @classmethod
295
    def convert_to(cls, mdict):
296
        """Create an object from MergeableDict instances"""
297
        return convert_to(mdict)
298
299
    def get_strategy(self):
300
        """Merge strategy"""
301
        return self.strategy
302
303
    def update(self, other, strategy=None, keep=False):
304
        """Update members recursively based on given strategy.
305
306
        :param other: Other MergeableDict instance to merge
307
        :param strategy: Merge strategy
308
        :param keep:
309
            Keep original value if type of original value is not a dict nor
310
            list.  It will be simply replaced with new value by default.
311
        """
312
        if strategy is None:
313
            strategy = self.get_strategy()
314
315
        if strategy == MS_REPLACE:
316
            self.update_w_replace(other)
317
        elif strategy == MS_NO_REPLACE:
318
            self.update_wo_replace(other)
319
        elif strategy == MS_DICTS_AND_LISTS:
320
            self.update_w_merge(other, merge_lists=True, keep=keep)
321
        else:
322
            self.update_w_merge(other, merge_lists=False, keep=keep)
323
324
    def update_w_replace(self, other):
325
        """Update and replace self w/ other if both has same keys.
326
327
        :param other: object of which type is same as self's.
328
329
        >>> md0 = MergeableDict.create(dict(a=1, b=[1, 3], c="abc"))
330
        >>> md1 = MergeableDict.create(dict(a=2, b=[0, 1], c="xyz"))
331
        >>> md0.update_w_replace(md1)
332
        >>> all(md0[k] == md1[k] for k in ("a", "b", "c"))
333
        True
334
        """
335
        if is_dict_like(other):
336
            for key, val in iteritems(other):
337
                self[key] = val
338
339
    def update_wo_replace(self, other):
340
        """Update self w/ other but never replace self w/ other.
341
342
        :param other: object of which type is same as self's.
343
344
        >>> md0 = md1 = MergeableDict.create(dict(a=1, b=[1, 3], c="abc"))
345
        >>> md2 = MergeableDict.create(dict(a=2, b=[0, 1], c="xyz", d=None))
346
        >>> md0.update_wo_replace(md2)
347
        >>> all(md0[k] != md2[k] for k in ("a", "b", "c"))
348
        True
349
        >>> all(md0[k] == md1[k] for k in ("a", "b", "c"))
350
        True
351
        >>> md0["d"] == md2["d"]
352
        True
353
        """
354
        if is_dict_like(other):
355
            for key, val in iteritems(other):
356
                if key not in self:
357
                    self[key] = val
358
359
    def update_w_merge(self, other, merge_lists=False, keep=False):
360
        """
361
        Merge members recursively. Behavior of merge will be vary depends on
362
        types of original and new values.
363
364
        - dict vs. dict -> merge recursively
365
        - list vs. list -> vary depends on `merge_lists`. see its description.
366
        - other objects vs. any -> vary depends on `keep`. see its description.
367
368
        :param merge_lists: Merge not only dicts but also lists. For example,
369
370
            [1, 2, 3], [3, 4] ==> [1, 2, 3, 4]
371
            [1, 2, 2], [2, 4] ==> [1, 2, 2, 4]
372
373
        :param keep:
374
            Keep original value if type of original value is not a dict nor
375
            list.  It will be simply replaced with new value by default.
376
377
        >>> md0 = md1 = MergeableDict.create(dict(a=1, b=dict(c=2, d=3),
378
        ...                                       e=[1, 2, 2]))
379
        >>> md2 = MergeableDict.create(dict(a=2, b=dict(d=4, f=5),
380
        ...                                 e=[2, 3, 4]))
381
        >>> md0.update_w_merge(md2, False)
382
        >>> md0["a"] == md2["a"]
383
        True
384
        >>> md0["b"]["d"] == md2["b"]["d"] and md0["b"]["f"] == md2["b"]["f"]
385
        True
386
        >>> md0["e"] == md2["e"]
387
        True
388
389
        >>> md3 = MergeableDict.create(dict(aaa=[1, 2, 3], ))
390
        >>> md4 = MergeableDict.create(dict(aaa=[4, 4, 5], ))
391
        >>> md3.update_w_merge(md4, True)
392
        >>> md3["aaa"] == [1, 2, 3, 4, 4, 5]
393
        True
394
        """
395
        if not is_dict_like(other):
396
            return
397
398
        for key, val in iteritems(other):
399
            val0 = self.get(key, None)  # Original value
400
401
            if val0 is None:
402
                self[key] = val
403
                continue
404
405
            if is_dict_like(val0):  # It needs recursive updates.
406
                self[key].update_w_merge(val, merge_lists, keep)
407
            elif merge_lists and is_iterable(val) and is_iterable(val0):
408
                self[key] += [x for x in list(val) if x not in val0]
409
            elif not keep:
410
                self[key] = val  # Overwrite it.
411
412
413
class OrderedMergeableDict(OrderedDict, MergeableDict):
414
    """MergeableDict keeps order of keys.
415
    """
416
    pass
417
418
# vim:sw=4:ts=4:et:
419