Completed
Push — master ( b5152d...f60cc8 )
by Satoru
01:01
created

anyconfig.create_from()   C

Complexity

Conditions 8

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 15
rs 6.6667
1
#
2
# Copyright (C) 2011 - 2015 Red Hat, Inc.
3
# Copyright (C) 2011 - 2013 Satoru SATOH <ssato redhat.com>
4
# License: MIT
5
#
6
"""Merge-able dict.
7
8
.. versionadded: 0.3.1
9
   Added naive and partial implementation of JSON Pointer support
10
11
.. note::
12
   JSON Pointer: http://tools.ietf.org/html/rfc6901
13
"""
14
from __future__ import absolute_import
15
from .compat import iteritems, UserDict
16
17
import collections
18
import functools
19
import operator
20
import re
21
22
from anyconfig.utils import is_iterable
23
24
# TBD: Keep items' order:
25
# from collections import OrderedDict as dict
26
27
28
MS_REPLACE = "replace"
29
MS_NO_REPLACE = "noreplace"
30
MS_DICTS = "merge_dicts"
31
MS_DICTS_AND_LISTS = "merge_dicts_and_lists"
32
33
MERGE_STRATEGIES = (MS_REPLACE, MS_NO_REPLACE, MS_DICTS, MS_DICTS_AND_LISTS)
34
35
PATH_SEPS = ('/', '.')
36
37
_JSNP_GET_ARRAY_IDX_REG = re.compile(r"(?:0|[1-9][0-9]*)")
38
_JSNP_SET_ARRAY_IDX = re.compile(r"(?:0|[1-9][0-9]*|-)")
39
40
41
def jsnp_unescape(jsn_s):
42
    """
43
    Parse and decode given encoded JSON Pointer expression, convert ~1 to
44
    / and ~0 to ~.
45
46
    >>> jsnp_unescape("/a~1b")
47
    '/a/b'
48
    >>> jsnp_unescape("~1aaa~1~0bbb")
49
    '/aaa/~bbb'
50
    """
51
    return jsn_s.replace('~1', '/').replace('~0', '~')
52
53
54
def parse_path(path, seps=PATH_SEPS):
55
    """
56
    Parse path expression and return list of path items.
57
58
    :param path: Path expression may contain separator chars.
59
    :param seps: Separator char candidates.
60
61
    :return: A list of keys to fetch object[s] later.
62
63
    >>> parse_path('')
64
    []
65
    >>> parse_path('/')  # JSON Pointer spec expects this behavior.
66
    ['']
67
    >>> parse_path('/a') == parse_path('.a') == ['a']
68
    True
69
    >>> parse_path('a') == parse_path('a.') == ['a']
70
    True
71
    >>> parse_path('/a/b/c') == parse_path('a.b.c') == ['a', 'b', 'c']
72
    True
73
    >>> parse_path('abc')
74
    ['abc']
75
    """
76
    if not path:
77
        return []
78
79
    for sep in seps:
80
        if sep in path:
81
            if path == sep:  # Special case, '/' or '.' only.
82
                return ['']
83
            return [x for x in path.split(sep) if x]
84
85
    return [path]
86
87
88
def get(dic, path, seps=PATH_SEPS):
89
    """
90
    getter for nested dicts.
91
92
    .. versionchanged: 0.3.1
93
       Added naive and partial implementation of JSON Pointer support
94
95
    :param dic: Dict or dict-like object
96
    :param path: Path expression to point object wanted
97
    :param seps: Separator char candidates
98
99
    :return: A tuple of (result_object, error_message)
100
101
    >>> d = {'a': {'b': {'c': 0, 'd': [1, 2]}}, '': 3}
102
    >>> get(d, '/')  # key becomes '' (empty string).
103
    (3, '')
104
    >>> get(d, "/a/b/c")
105
    (0, '')
106
    >>> sorted(get(d, "a.b")[0].items())
107
    [('c', 0), ('d', [1, 2])]
108
    >>> (get(d, "a.b.d"), get(d, "/a/b/d/1"))
109
    (([1, 2], ''), (2, ''))
110
    >>> get(d, "a.b.key_not_exist")  # doctest: +ELLIPSIS
111
    (None, "'...'")
112
    >>> get(d, "/a/b/d/2")
113
    (None, 'list index out of range')
114
    >>> get(d, "/a/b/d/-")  # doctest: +ELLIPSIS
115
    (None, 'list indices must be integers...')
116
    """
117
    items = [jsnp_unescape(p) for p in parse_path(path, seps)]
118
    if not items:
119
        return (dic, '')
120
    try:
121
        if len(items) == 1:
122
            return (dic[items[0]], '')
123
124
        parent = functools.reduce(operator.getitem, items[:-1], dic)
125
126
        if is_iterable(parent) and _JSNP_GET_ARRAY_IDX_REG.match(items[-1]):
127
            return (parent[int(items[-1])], '')
128
        else:
129
            return (parent[items[-1]], '')
130
131
    except (TypeError, KeyError, IndexError) as exc:
132
        return (None, str(exc))
133
134
135
def mk_nested_dic(path, val, seps=PATH_SEPS):
136
    """
137
    Make a nested dict iteratively.
138
139
    :param path: Path expression to make a nested dict
140
    :param val: Value to set
141
    :param seps: Separator char candidates
142
143
    >>> mk_nested_dic("a.b.c", 1)
144
    {'a': {'b': {'c': 1}}}
145
    >>> mk_nested_dic("/a/b/c", 1)
146
    {'a': {'b': {'c': 1}}}
147
    """
148
    ret = None
149
    for key in reversed(parse_path(path, seps)):
150
        ret = {key: val if ret is None else ret.copy()}
151
152
    return ret
153
154
155
def set_(dic, path, val, seps=PATH_SEPS, strategy=None):
156
    """
157
    setter for nested dicts.
158
159
    :param dic: MergeableDict instance or other dict-like objects support
160
        recursive merge operations.
161
    :param path: Path expression to point object wanted
162
    :param seps: Separator char candidates.
163
164
    >>> d = MergeableDict.create(dict(a=1, b=dict(c=2, )))
165
    >>> set_(d, 'a.b.d', 3)
166
    >>> d['a']['b']['d']
167
    3
168
    """
169
    diff = mk_nested_dic(path, val, seps)
170
    dic.update(diff, strategy)
171
172
173
def is_dict_like(obj):
174
    """
175
    :param obj: Any object may be an instance of MergeableDict or dict.
176
177
    >>> is_dict_like("a string")
178
    False
179
    >>> is_dict_like({})
180
    True
181
    >>> is_dict_like(create_from({}))
182
    True
183
    """
184
    return isinstance(obj, (dict, collections.Mapping, UserDict))
185
186
187
def convert_to(mdict):
188
    """
189
    Convert a MergeableDict instances to a dict object.
190
191
    Borrowed basic idea and implementation from bunch.unbunchify.
192
    (bunch is distributed under MIT license same as this module.)
193
194
    :param mdict: A MergeableDict instance
195
    :return: A dict
196
    """
197
    if is_dict_like(mdict):
198
        return dict((k, convert_to(v)) for k, v in iteritems(mdict))
199
    elif is_iterable(mdict):
200
        return type(mdict)(convert_to(v) for v in mdict)
201
    else:
202
        return mdict
203
204
205
def create_from(dic):
206
    """
207
    Try creating a MergeableDict instance[s] from a dict or any other objects.
208
209
    :param dic: A dict instance
210
    """
211
    if is_dict_like(dic):
212
        return MergeableDict((k, create_from(v)) for k, v in iteritems(dic))
213
    elif isinstance(dic, tuple) and hasattr(dic, "_asdict"):  # namedtuple
214
        return MergeableDict((k, create_from(getattr(dic, k))) for k
215
                             in dic._fields)
216
    elif is_iterable(dic):
217
        return type(dic)(create_from(v) for v in dic)
218
    else:
219
        return dic
220
221
222
class MergeableDict(dict):
223
    """
224
    Dict based object supports 'merge' operation.
225
    """
226
227
    strategy = MS_DICTS
228
229
    @classmethod
230
    def create(cls, obj):
231
        """Create an instance from any object."""
232
        return create_from(obj)
233
234
    @classmethod
235
    def convert_to(cls, mdict):
236
        """Create an object from MergeableDict instances"""
237
        return convert_to(mdict)
238
239
    def get_strategy(self):
240
        """Merge strategy"""
241
        return self.strategy
242
243
    def update(self, other, strategy=None, keep=False):
244
        """Update members recursively based on given strategy.
245
246
        :param other: Other MergeableDict instance to merge
247
        :param strategy: Merge strategy
248
        :param keep:
249
            Keep original value if type of original value is not a dict nor
250
            list.  It will be simply replaced with new value by default.
251
        """
252
        if strategy is None:
253
            strategy = self.get_strategy()
254
255
        if strategy == MS_REPLACE:
256
            self.update_w_replace(other)
257
        elif strategy == MS_NO_REPLACE:
258
            self.update_wo_replace(other)
259
        elif strategy == MS_DICTS_AND_LISTS:
260
            self.update_w_merge(other, merge_lists=True, keep=keep)
261
        else:
262
            self.update_w_merge(other, merge_lists=False, keep=keep)
263
264
    def update_w_replace(self, other):
265
        """Update and replace self w/ other if both has same keys.
266
267
        :param other: object of which type is same as self's.
268
269
        >>> md0 = MergeableDict.create(dict(a=1, b=[1, 3], c="abc"))
270
        >>> md1 = MergeableDict.create(dict(a=2, b=[0, 1], c="xyz"))
271
        >>> md0.update_w_replace(md1)
272
        >>> all(md0[k] == md1[k] for k in ("a", "b", "c"))
273
        True
274
        """
275
        if is_dict_like(other):
276
            for key, val in iteritems(other):
277
                self[key] = val
278
279
    def update_wo_replace(self, other):
280
        """Update self w/ other but never replace self w/ other.
281
282
        :param other: object of which type is same as self's.
283
284
        >>> md0 = md1 = MergeableDict.create(dict(a=1, b=[1, 3], c="abc"))
285
        >>> md2 = MergeableDict.create(dict(a=2, b=[0, 1], c="xyz", d=None))
286
        >>> md0.update_wo_replace(md2)
287
        >>> all(md0[k] != md2[k] for k in ("a", "b", "c"))
288
        True
289
        >>> all(md0[k] == md1[k] for k in ("a", "b", "c"))
290
        True
291
        >>> md0["d"] == md2["d"]
292
        True
293
        """
294
        if is_dict_like(other):
295
            for key, val in iteritems(other):
296
                if key not in self:
297
                    self[key] = val
298
299
    def update_w_merge(self, other, merge_lists=False, keep=False):
300
        """
301
        Merge members recursively. Behavior of merge will be vary depends on
302
        types of original and new values.
303
304
        - dict vs. dict -> merge recursively
305
        - list vs. list -> vary depends on `merge_lists`. see its description.
306
        - other objects vs. any -> vary depends on `keep`. see its description.
307
308
        :param merge_lists: Merge not only dicts but also lists. For example,
309
310
            [1, 2, 3], [3, 4] ==> [1, 2, 3, 4]
311
            [1, 2, 2], [2, 4] ==> [1, 2, 2, 4]
312
313
        :param keep:
314
            Keep original value if type of original value is not a dict nor
315
            list.  It will be simply replaced with new value by default.
316
317
        >>> md0 = md1 = MergeableDict.create(dict(a=1, b=dict(c=2, d=3),
318
        ...                                       e=[1, 2, 2]))
319
        >>> md2 = MergeableDict.create(dict(a=2, b=dict(d=4, f=5),
320
        ...                                 e=[2, 3, 4]))
321
        >>> md0.update_w_merge(md2, False)
322
        >>> md0["a"] == md2["a"]
323
        True
324
        >>> md0["b"]["d"] == md2["b"]["d"] and md0["b"]["f"] == md2["b"]["f"]
325
        True
326
        >>> md0["e"] == md2["e"]
327
        True
328
329
        >>> md3 = MergeableDict.create(dict(aaa=[1, 2, 3], ))
330
        >>> md4 = MergeableDict.create(dict(aaa=[4, 4, 5], ))
331
        >>> md3.update_w_merge(md4, True)
332
        >>> md3["aaa"] == [1, 2, 3, 4, 4, 5]
333
        True
334
        """
335
        if not is_dict_like(other):
336
            return
337
338
        for key, val in iteritems(other):
339
            val0 = self.get(key, None)  # Original value
340
341
            if val0 is None:
342
                self[key] = val
343
                continue
344
345
            if is_dict_like(val0):  # It needs recursive updates.
346
                self[key].update_w_merge(val, merge_lists, keep)
347
            elif merge_lists and is_iterable(val) and is_iterable(val0):
348
                self[key] += [x for x in list(val) if x not in val0]
349
            elif not keep:
350
                self[key] = val  # Overwrite it.
351
352
# vim:sw=4:ts=4:et:
353