Completed
Push — master ( bff2ab...5dc771 )
by Satoru
01:07
created

anyconfig.find_by_file()   C

Complexity

Conditions 7

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 7
dl 0
loc 35
rs 5.5
1
#
2
# Copyright (C) 2011 - 2015 Satoru SATOH <ssato @ redhat.com>
3
# License: MIT
4
#
5
# suppress false-positive warn at '... pkg_resources ...' line:
6
# pylint: disable=no-member
7
"""A module to aggregate config parser (loader/dumper) backends.
8
"""
9
from __future__ import absolute_import
10
11
import itertools
12
import logging
13
import operator
14
import pkg_resources
15
16
import anyconfig.compat
17
import anyconfig.utils
18
19
import anyconfig.backend.base
20
import anyconfig.backend.ini
21
import anyconfig.backend.json
22
import anyconfig.backend.properties
23
import anyconfig.backend.xml
24
25
LOGGER = logging.getLogger(__name__)
26
PARSERS = [anyconfig.backend.ini.Parser, anyconfig.backend.json.Parser,
27
           anyconfig.backend.properties.Parser, anyconfig.backend.xml.Parser]
28
29
_NA_MSG = "%s is not available. Disabled %s support."
30
31
try:
32
    import anyconfig.backend.yaml
33
    PARSERS.append(anyconfig.backend.yaml.Parser)
34
except ImportError:
35
    LOGGER.info(_NA_MSG, "yaml module", "YAML")
36
37
try:
38
    import anyconfig.backend.configobj
39
    PARSERS.append(anyconfig.backend.configobj.Parser)
40
except ImportError:
41
    LOGGER.info(_NA_MSG, "ConfigObj module", "its")
42
43
try:
44
    import anyconfig.backend.msgpack
45
    PARSERS.append(anyconfig.backend.msgpack.Parser)
46
except ImportError:
47
    LOGGER.info(_NA_MSG, "msgpack module", "MessagePack")
48
49
try:
50
    import anyconfig.backend.toml
51
    PARSERS.append(anyconfig.backend.toml.Parser)
52
except ImportError:
53
    LOGGER.info(_NA_MSG, "toml module", "TOML")
54
55
try:
56
    import anyconfig.backend.bson
57
    PARSERS.append(anyconfig.backend.bson.Parser)
58
except ImportError:
59
    LOGGER.info(_NA_MSG, "bson module in pymongo package", "BSON")
60
61
for e in pkg_resources.iter_entry_points("anyconfig_backends"):
62
    try:
63
        PARSERS.append(e.load())
64
    except ImportError:
65
        continue
66
67
68
def fst(tpl):
69
    """
70
    >>> fst((0, 1))
71
    0
72
    """
73
    return tpl[0]
74
75
76
def snd(tpl):
77
    """
78
    >>> snd((0, 1))
79
    1
80
    """
81
    return tpl[1]
82
83
84
def groupby_key(itr, keyfunc):
85
    """
86
    An wrapper function around itertools.groupby
87
88
    :param itr: Iterable object, a list/tuple/genrator, etc.
89
    :param keyfunc: Key function to sort `itr`.
90
91
    >>> itr = [("a", 1), ("b", -1), ("c", 1)]
92
    >>> res = groupby_key(itr, operator.itemgetter(1))
93
    >>> [(key, tuple(grp)) for key, grp in res]
94
    [(-1, (('b', -1),)), (1, (('a', 1), ('c', 1)))]
95
    """
96
    return itertools.groupby(sorted(itr, key=keyfunc), key=keyfunc)
97
98
99
def uniq(iterable, **kwopts):
100
    """sorted + uniq
101
102
    .. note::
103
       sorted(set(iterable), key=iterable.index) cannot be used for any
104
       iterables (generator, a list of dicts, etc.), I guess.
105
106
    :param iterable: Iterable objects, a list, generator, iterator, etc.
107
    :param kwopts: Keyword options passed to sorted()
108
    :return: a sorted list of items in iterable
109
110
    >>> uniq([1, 2, 3, 1, 2])
111
    [1, 2, 3]
112
    >>> uniq((i for i in (2, 10, 3, 2, 5, 1, 7, 3)))
113
    [1, 2, 3, 5, 7, 10]
114
    >>> uniq(({str(i): i} for i in (2, 10, 3, 2, 5, 1, 7, 3)),
115
    ...      key=lambda d: int(list(d.keys())[0]))
116
    [{'1': 1}, {'2': 2}, {'3': 3}, {'5': 5}, {'7': 7}, {'10': 10}]
117
    """
118
    return [t[0] for t in itertools.groupby(sorted(iterable, **kwopts))]
119
120
121
def is_parser(obj):
122
    """
123
    :return: True if given `obj` is an instance of parser.
124
125
    >>> is_parser("ini")
126
    False
127
    >>> is_parser(anyconfig.backend.base.Parser)
128
    False
129
    >>> is_parser(anyconfig.backend.base.Parser())
130
    True
131
    """
132
    return isinstance(obj, anyconfig.backend.base.Parser)
133
134
135
def list_parsers_by_type(cps=None):
136
    """
137
    :return: List (generator) of (config_type, [config_parser])
138
    """
139
    if cps is None:
140
        cps = PARSERS
141
142
    return ((t, sorted(p, key=operator.methodcaller("priority"))) for t, p
143
            in groupby_key(cps, operator.methodcaller("type")))
144
145
146
def _list_xppairs(xps):
147
    """List config parsers by priority.
148
    """
149
    return sorted((snd(xp) for xp in xps),
150
                  key=operator.methodcaller("priority"))
151
152
153
def list_parsers_by_extension(cps=None):
154
    """
155
    :return: List (generator) of (config_ext, [config_parser])
156
    """
157
    if cps is None:
158
        cps = PARSERS
159
160
    cps_by_ext = anyconfig.utils.concat(([(x, p) for x in p.extensions()] for p
161
                                         in cps))
162
163
    return ((x, _list_xppairs(xps)) for x, xps in groupby_key(cps_by_ext, fst))
164
165
166
def find_by_file(path_or_stream, cps=None, is_path_=False):
167
    """
168
    Find config parser by the extension of file `path_or_stream`, file path or
169
    stream (a file or file-like objects).
170
171
    :param path_or_stream: Config file path or file/file-like object
172
    :param cps: A list of pairs :: (type, parser_class)
173
    :param is_path_: True if given `path_or_stream` is a file path
174
175
    :return: Config Parser class found
176
177
    >>> find_by_file("a.missing_cnf_ext") is None
178
    True
179
    >>> strm = anyconfig.compat.StringIO()
180
    >>> find_by_file(strm) is None
181
    True
182
    >>> find_by_file("a.json")
183
    <class 'anyconfig.backend.json.Parser'>
184
    >>> find_by_file("a.json", is_path_=True)
185
    <class 'anyconfig.backend.json.Parser'>
186
    """
187
    if cps is None:
188
        cps = PARSERS
189
190
    if not is_path_ and not anyconfig.utils.is_path(path_or_stream):
191
        path_or_stream = anyconfig.utils.get_path_from_stream(path_or_stream)
192
        if path_or_stream is None:
193
            return None  # There is no way to detect file path.
194
195
    ext_ref = anyconfig.utils.get_file_extension(path_or_stream)
196
    for ext, psrs in list_parsers_by_extension(cps):
197
        if ext == ext_ref:
198
            return psrs[-1]
199
200
    return None
201
202
203
def find_by_type(cptype, cps=None):
204
    """
205
    Find config parser by file's extension.
206
207
    :param cptype: Config file's type
208
    :param cps: A list of pairs :: (type, parser_class)
209
    :return: Config Parser class found
210
211
    >>> find_by_type("missing_type") is None
212
    True
213
    """
214
    if cps is None:
215
        cps = PARSERS
216
217
    for type_, psrs in list_parsers_by_type(cps):
218
        if type_ == cptype:
219
            return psrs[-1] or None
220
221
    return None
222
223
224
def find_parser(path_or_stream, forced_type=None, is_path_=False):
225
    """
226
    Find out config parser object appropriate to load from a file of given path
227
    or file/file-like object.
228
229
    :param path_or_stream: Configuration file path or file / file-like object
230
    :param forced_type: Forced configuration parser type
231
    :param is_path_: True if given `path_or_stream` is a file path
232
233
    :return: A tuple of (Parser class or None, "" or error message)
234
235
    >>> find_parser(None)
236
    Traceback (most recent call last):
237
    ValueError: path_or_stream or forced_type must be some value
238
239
    >>> find_parser(None, "ini")
240
    (<class 'anyconfig.backend.ini.Parser'>, '')
241
    >>> find_parser(None, "type_not_exist")
242
    (None, 'No parser found for given type: type_not_exist')
243
244
    >>> find_parser("cnf.json")
245
    (<class 'anyconfig.backend.json.Parser'>, '')
246
    >>> find_parser("cnf.json", is_path_=True)
247
    (<class 'anyconfig.backend.json.Parser'>, '')
248
    >>> find_parser("cnf.ext_not_found")
249
    (None, 'No parser found for given file: cnf.ext_not_found')
250
    """
251
    if not path_or_stream and forced_type is None:
252
        raise ValueError("path_or_stream or forced_type must be some value")
253
254
    err = ""
255
    if forced_type is not None:
256
        parser = find_by_type(forced_type)
257
        if parser is None:
258
            err = "No parser found for given type: %s" % forced_type
259
    else:
260
        parser = find_by_file(path_or_stream, is_path_=is_path_)
261
        if parser is None:
262
            err = "No parser found for given file: %s" % path_or_stream
263
264
    return (parser, err)
265
266
267
def list_types(cps=None):
268
    """List available config types.
269
    """
270
    if cps is None:
271
        cps = PARSERS
272
273
    return uniq(t for t, ps in list_parsers_by_type(cps))
274
275
# vim:sw=4:ts=4:et:
276