Completed
Push — master ( 940f1a...75e7b2 )
by Satoru
56s
created

anyconfig.convert_to()   F

Complexity

Conditions 10

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 10
dl 0
loc 32
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like anyconfig.convert_to() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2011 - 2015 Red Hat, Inc.
3
# Copyright (C) 2011 - 2013 Satoru SATOH <ssato redhat.com>
4
# License: MIT
5
#
6
"""Merge-able dict.
7
8
.. versionadded: 0.3.1
9
   Added naive and partial implementation of JSON Pointer support
10
11
.. versionchanged: 0.5.0
12
   Convert collections.namedtuple objects to dicts recursively
13
14
.. note::
15
   JSON Pointer: http://tools.ietf.org/html/rfc6901
16
"""
17
from __future__ import absolute_import
18
19
import collections
20
import functools
21
import operator
22
import re
23
24
from .compat import iteritems, UserDict
25
from anyconfig.utils import is_iterable
26
27
# TBD: Keep items' order:
28
# from collections import OrderedDict as dict
29
30
31
MS_REPLACE = "replace"
32
MS_NO_REPLACE = "noreplace"
33
MS_DICTS = "merge_dicts"
34
MS_DICTS_AND_LISTS = "merge_dicts_and_lists"
35
36
MERGE_STRATEGIES = (MS_REPLACE, MS_NO_REPLACE, MS_DICTS, MS_DICTS_AND_LISTS)
37
38
PATH_SEPS = ('/', '.')
39
40
_JSNP_GET_ARRAY_IDX_REG = re.compile(r"(?:0|[1-9][0-9]*)")
41
_JSNP_SET_ARRAY_IDX = re.compile(r"(?:0|[1-9][0-9]*|-)")
42
43
NAMEDTUPLE_CLS_KEY = "_namedtuple_cls_"
44
45
46
def jsnp_unescape(jsn_s):
47
    """
48
    Parse and decode given encoded JSON Pointer expression, convert ~1 to
49
    / and ~0 to ~.
50
51
    >>> jsnp_unescape("/a~1b")
52
    '/a/b'
53
    >>> jsnp_unescape("~1aaa~1~0bbb")
54
    '/aaa/~bbb'
55
    """
56
    return jsn_s.replace('~1', '/').replace('~0', '~')
57
58
59
def parse_path(path, seps=PATH_SEPS):
60
    """
61
    Parse path expression and return list of path items.
62
63
    :param path: Path expression may contain separator chars.
64
    :param seps: Separator char candidates.
65
66
    :return: A list of keys to fetch object[s] later.
67
68
    >>> parse_path('')
69
    []
70
    >>> parse_path('/')  # JSON Pointer spec expects this behavior.
71
    ['']
72
    >>> parse_path('/a') == parse_path('.a') == ['a']
73
    True
74
    >>> parse_path('a') == parse_path('a.') == ['a']
75
    True
76
    >>> parse_path('/a/b/c') == parse_path('a.b.c') == ['a', 'b', 'c']
77
    True
78
    >>> parse_path('abc')
79
    ['abc']
80
    """
81
    if not path:
82
        return []
83
84
    for sep in seps:
85
        if sep in path:
86
            if path == sep:  # Special case, '/' or '.' only.
87
                return ['']
88
            return [x for x in path.split(sep) if x]
89
90
    return [path]
91
92
93
def get(dic, path, seps=PATH_SEPS):
94
    """
95
    getter for nested dicts.
96
97
    .. versionchanged: 0.3.1
98
       Added naive and partial implementation of JSON Pointer support
99
100
    :param dic: Dict or dict-like object
101
    :param path: Path expression to point object wanted
102
    :param seps: Separator char candidates
103
104
    :return: A tuple of (result_object, error_message)
105
106
    >>> d = {'a': {'b': {'c': 0, 'd': [1, 2]}}, '': 3}
107
    >>> get(d, '/')  # key becomes '' (empty string).
108
    (3, '')
109
    >>> get(d, "/a/b/c")
110
    (0, '')
111
    >>> sorted(get(d, "a.b")[0].items())
112
    [('c', 0), ('d', [1, 2])]
113
    >>> (get(d, "a.b.d"), get(d, "/a/b/d/1"))
114
    (([1, 2], ''), (2, ''))
115
    >>> get(d, "a.b.key_not_exist")  # doctest: +ELLIPSIS
116
    (None, "'...'")
117
    >>> get(d, "/a/b/d/2")
118
    (None, 'list index out of range')
119
    >>> get(d, "/a/b/d/-")  # doctest: +ELLIPSIS
120
    (None, 'list indices must be integers...')
121
    """
122
    items = [jsnp_unescape(p) for p in parse_path(path, seps)]
123
    if not items:
124
        return (dic, '')
125
    try:
126
        if len(items) == 1:
127
            return (dic[items[0]], '')
128
129
        parent = functools.reduce(operator.getitem, items[:-1], dic)
130
131
        if is_iterable(parent) and _JSNP_GET_ARRAY_IDX_REG.match(items[-1]):
132
            return (parent[int(items[-1])], '')
133
        else:
134
            return (parent[items[-1]], '')
135
136
    except (TypeError, KeyError, IndexError) as exc:
137
        return (None, str(exc))
138
139
140
def mk_nested_dic(path, val, seps=PATH_SEPS):
141
    """
142
    Make a nested dict iteratively.
143
144
    :param path: Path expression to make a nested dict
145
    :param val: Value to set
146
    :param seps: Separator char candidates
147
148
    >>> mk_nested_dic("a.b.c", 1)
149
    {'a': {'b': {'c': 1}}}
150
    >>> mk_nested_dic("/a/b/c", 1)
151
    {'a': {'b': {'c': 1}}}
152
    """
153
    ret = None
154
    for key in reversed(parse_path(path, seps)):
155
        ret = {key: val if ret is None else ret.copy()}
156
157
    return ret
158
159
160
def set_(dic, path, val, seps=PATH_SEPS, strategy=None):
161
    """
162
    setter for nested dicts.
163
164
    :param dic: MergeableDict instance or other dict-like objects support
165
        recursive merge operations.
166
    :param path: Path expression to point object wanted
167
    :param seps: Separator char candidates.
168
169
    >>> d = MergeableDict.create(dict(a=1, b=dict(c=2, )))
170
    >>> set_(d, 'a.b.d', 3)
171
    >>> d['a']['b']['d']
172
    3
173
    """
174
    diff = mk_nested_dic(path, val, seps)
175
    dic.update(diff, strategy)
176
177
178
def is_dict_like(obj):
179
    """
180
    :param obj: Any object may be an instance of MergeableDict or dict.
181
182
    >>> is_dict_like("a string")
183
    False
184
    >>> is_dict_like({})
185
    True
186
    >>> is_dict_like(create_from({}))
187
    True
188
    """
189
    return isinstance(obj, (dict, collections.Mapping, UserDict))
190
191
192
def is_namedtuple(obj):
193
    """
194
    >>> p0 = collections.namedtuple("Point", "x y")(1, 2)
195
    >>> is_namedtuple(p0)
196
    True
197
    >>> is_namedtuple(tuple(p0))
198
    False
199
    """
200
    return isinstance(obj, tuple) and hasattr(obj, "_asdict")
201
202
203
def convert_to(mdict, to_namedtuple=False,
204
               _namedtuple_cls_key=NAMEDTUPLE_CLS_KEY):
205
    """
206
    Convert a MergeableDict instances to a dict object.
207
208
    Borrowed basic idea and implementation from bunch.unbunchify.
209
    (bunch is distributed under MIT license same as this module.)
210
211
    :param mdict: A MergeableDict instance
212
    :param to_namedtuple:
213
        Convert `mdict` to namedtuple object of which definition is created on
214
        the fly if True
215
    :param _namedtuple_cls_key:
216
        Special keyword to embedded the class name of namedtuple object to
217
        create.  See the comments in :func:`create_from` also.
218
219
    :return: A dict or namedtuple object if to_namedtuple is True
220
    """
221
    if is_dict_like(mdict) or is_namedtuple(mdict):
222
        if to_namedtuple:
223
            _name = mdict.get(_namedtuple_cls_key, "NamedTuple")
224
            _keys = [k for k in mdict.keys() if k != _namedtuple_cls_key]
225
            _vals = [convert_to(mdict[k], to_namedtuple, _namedtuple_cls_key)
226
                     for k in _keys]
227
            return collections.namedtuple(_name, _keys)(*_vals)
228
        else:
229
            return dict((k, convert_to(v)) for k, v in iteritems(mdict))
230
    elif is_iterable(mdict):
231
        return type(mdict)(convert_to(v, to_namedtuple, _namedtuple_cls_key)
232
                           for v in mdict)
233
    else:
234
        return mdict
235
236
237
def create_from(dic, _namedtuple_cls_key=NAMEDTUPLE_CLS_KEY):
238
    """
239
    Try creating a MergeableDict instance[s] from a dict or any other objects.
240
241
    :param dic: A dict instance
242
    :param _namedtuple_cls_key:
243
        Special keyword to embedded the class name of namedtuple object to the
244
        MergeableDict object created. It's a hack and not elegant but I don't
245
        think there are another ways to make same namedtuple object from the
246
        MergeableDict object created from it.
247
    """
248
    if is_dict_like(dic):
249
        return MergeableDict((k, create_from(v)) for k, v in iteritems(dic))
250
    elif isinstance(dic, tuple) and hasattr(dic, "_asdict"):  # namedtuple
251
        mdict = OrderedMergeableDict((k, create_from(getattr(dic, k))) for k
252
                                     in dic._fields)
253
        mdict[_namedtuple_cls_key] = dic.__class__.__name__
254
        return mdict
255
    elif is_iterable(dic):
256
        return type(dic)(create_from(v) for v in dic)
257
    else:
258
        return dic
259
260
261
class MergeableDict(dict):
262
    """
263
    Dict based object supports 'merge' operation.
264
    """
265
266
    strategy = MS_DICTS
267
268
    @classmethod
269
    def create(cls, obj):
270
        """Create an instance from any object."""
271
        return create_from(obj)
272
273
    @classmethod
274
    def convert_to(cls, mdict):
275
        """Create an object from MergeableDict instances"""
276
        return convert_to(mdict)
277
278
    def get_strategy(self):
279
        """Merge strategy"""
280
        return self.strategy
281
282
    def update(self, other, strategy=None, keep=False):
283
        """Update members recursively based on given strategy.
284
285
        :param other: Other MergeableDict instance to merge
286
        :param strategy: Merge strategy
287
        :param keep:
288
            Keep original value if type of original value is not a dict nor
289
            list.  It will be simply replaced with new value by default.
290
        """
291
        if strategy is None:
292
            strategy = self.get_strategy()
293
294
        if strategy == MS_REPLACE:
295
            self.update_w_replace(other)
296
        elif strategy == MS_NO_REPLACE:
297
            self.update_wo_replace(other)
298
        elif strategy == MS_DICTS_AND_LISTS:
299
            self.update_w_merge(other, merge_lists=True, keep=keep)
300
        else:
301
            self.update_w_merge(other, merge_lists=False, keep=keep)
302
303
    def update_w_replace(self, other):
304
        """Update and replace self w/ other if both has same keys.
305
306
        :param other: object of which type is same as self's.
307
308
        >>> md0 = MergeableDict.create(dict(a=1, b=[1, 3], c="abc"))
309
        >>> md1 = MergeableDict.create(dict(a=2, b=[0, 1], c="xyz"))
310
        >>> md0.update_w_replace(md1)
311
        >>> all(md0[k] == md1[k] for k in ("a", "b", "c"))
312
        True
313
        """
314
        if is_dict_like(other):
315
            for key, val in iteritems(other):
316
                self[key] = val
317
318
    def update_wo_replace(self, other):
319
        """Update self w/ other but never replace self w/ other.
320
321
        :param other: object of which type is same as self's.
322
323
        >>> md0 = md1 = MergeableDict.create(dict(a=1, b=[1, 3], c="abc"))
324
        >>> md2 = MergeableDict.create(dict(a=2, b=[0, 1], c="xyz", d=None))
325
        >>> md0.update_wo_replace(md2)
326
        >>> all(md0[k] != md2[k] for k in ("a", "b", "c"))
327
        True
328
        >>> all(md0[k] == md1[k] for k in ("a", "b", "c"))
329
        True
330
        >>> md0["d"] == md2["d"]
331
        True
332
        """
333
        if is_dict_like(other):
334
            for key, val in iteritems(other):
335
                if key not in self:
336
                    self[key] = val
337
338
    def update_w_merge(self, other, merge_lists=False, keep=False):
339
        """
340
        Merge members recursively. Behavior of merge will be vary depends on
341
        types of original and new values.
342
343
        - dict vs. dict -> merge recursively
344
        - list vs. list -> vary depends on `merge_lists`. see its description.
345
        - other objects vs. any -> vary depends on `keep`. see its description.
346
347
        :param merge_lists: Merge not only dicts but also lists. For example,
348
349
            [1, 2, 3], [3, 4] ==> [1, 2, 3, 4]
350
            [1, 2, 2], [2, 4] ==> [1, 2, 2, 4]
351
352
        :param keep:
353
            Keep original value if type of original value is not a dict nor
354
            list.  It will be simply replaced with new value by default.
355
356
        >>> md0 = md1 = MergeableDict.create(dict(a=1, b=dict(c=2, d=3),
357
        ...                                       e=[1, 2, 2]))
358
        >>> md2 = MergeableDict.create(dict(a=2, b=dict(d=4, f=5),
359
        ...                                 e=[2, 3, 4]))
360
        >>> md0.update_w_merge(md2, False)
361
        >>> md0["a"] == md2["a"]
362
        True
363
        >>> md0["b"]["d"] == md2["b"]["d"] and md0["b"]["f"] == md2["b"]["f"]
364
        True
365
        >>> md0["e"] == md2["e"]
366
        True
367
368
        >>> md3 = MergeableDict.create(dict(aaa=[1, 2, 3], ))
369
        >>> md4 = MergeableDict.create(dict(aaa=[4, 4, 5], ))
370
        >>> md3.update_w_merge(md4, True)
371
        >>> md3["aaa"] == [1, 2, 3, 4, 4, 5]
372
        True
373
        """
374
        if not is_dict_like(other):
375
            return
376
377
        for key, val in iteritems(other):
378
            val0 = self.get(key, None)  # Original value
379
380
            if val0 is None:
381
                self[key] = val
382
                continue
383
384
            if is_dict_like(val0):  # It needs recursive updates.
385
                self[key].update_w_merge(val, merge_lists, keep)
386
            elif merge_lists and is_iterable(val) and is_iterable(val0):
387
                self[key] += [x for x in list(val) if x not in val0]
388
            elif not keep:
389
                self[key] = val  # Overwrite it.
390
391
392
class OrderedMergeableDict(collections.OrderedDict, MergeableDict):
393
    """MergeableDict keeps order of keys.
394
    """
395
    pass
396
397
# vim:sw=4:ts=4:et:
398