Completed
Push — master ( 9a0936...cac8f2 )
by Satoru
01:04
created

list_parsers_by_extension()   B

Complexity

Conditions 5

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 11
rs 8.5454
c 0
b 0
f 0
1
#
2
# Copyright (C) 2011 - 2016 Satoru SATOH <ssato @ redhat.com>
3
# License: MIT
4
#
5
# Suppress:
6
# - false-positive warn at '... pkg_resources ...' line
7
# - import positions after some globals are defined
8
# pylint: disable=no-member,wrong-import-position
9
"""A module to aggregate config parser (loader/dumper) backends.
10
"""
11
from __future__ import absolute_import
12
13
import itertools
14
import logging
15
import operator
16
import pkg_resources
17
18
import anyconfig.compat
19
import anyconfig.utils
20
21
import anyconfig.backend.base
22
import anyconfig.backend.ini
23
import anyconfig.backend.json
24
import anyconfig.backend.properties
25
import anyconfig.backend.shellvars
26
import anyconfig.backend.xml
27
28
LOGGER = logging.getLogger(__name__)
29
PARSERS = [anyconfig.backend.ini.Parser, anyconfig.backend.json.Parser,
30
           anyconfig.backend.properties.Parser,
31
           anyconfig.backend.shellvars.Parser, anyconfig.backend.xml.Parser]
32
33
_NA_MSG = "%s is not available. Disabled %s support."
34
35
try:
36
    import anyconfig.backend.yaml
37
    PARSERS.append(anyconfig.backend.yaml.Parser)
38
except ImportError:
39
    LOGGER.info(_NA_MSG, "yaml module", "YAML")
40
41
try:
42
    import anyconfig.backend.configobj
43
    PARSERS.append(anyconfig.backend.configobj.Parser)
44
except ImportError:
45
    LOGGER.info(_NA_MSG, "ConfigObj module", "its")
46
47
try:
48
    import anyconfig.backend.msgpack
49
    PARSERS.append(anyconfig.backend.msgpack.Parser)
50
except ImportError:
51
    LOGGER.info(_NA_MSG, "msgpack module", "MessagePack")
52
53
try:
54
    import anyconfig.backend.toml
55
    PARSERS.append(anyconfig.backend.toml.Parser)
56
except ImportError:
57
    LOGGER.info(_NA_MSG, "toml module", "TOML")
58
59
try:
60
    import anyconfig.backend.bson
61
    PARSERS.append(anyconfig.backend.bson.Parser)
62
except ImportError:
63
    LOGGER.info(_NA_MSG, "bson module in pymongo package", "BSON")
64
65
for e in pkg_resources.iter_entry_points("anyconfig_backends"):
66
    try:
67
        PARSERS.append(e.load())
68
    except ImportError:
69
        continue
70
71
72
def fst(tpl):
73
    """
74
    >>> fst((0, 1))
75
    0
76
    """
77
    return tpl[0]
78
79
80
def snd(tpl):
81
    """
82
    >>> snd((0, 1))
83
    1
84
    """
85
    return tpl[1]
86
87
88
def groupby_key(itr, keyfunc):
89
    """
90
    An wrapper function around itertools.groupby
91
92
    :param itr: Iterable object, a list/tuple/genrator, etc.
93
    :param keyfunc: Key function to sort `itr`.
94
95
    >>> itr = [("a", 1), ("b", -1), ("c", 1)]
96
    >>> res = groupby_key(itr, operator.itemgetter(1))
97
    >>> [(key, tuple(grp)) for key, grp in res]
98
    [(-1, (('b', -1),)), (1, (('a', 1), ('c', 1)))]
99
    """
100
    return itertools.groupby(sorted(itr, key=keyfunc), key=keyfunc)
101
102
103
def uniq(iterable, **kwopts):
104
    """sorted + uniq
105
106
    .. note::
107
       sorted(set(iterable), key=iterable.index) cannot be used for any
108
       iterables (generator, a list of dicts, etc.), I guess.
109
110
    :param iterable: Iterable objects, a list, generator, iterator, etc.
111
    :param kwopts: Keyword options passed to sorted()
112
    :return: a sorted list of items in iterable
113
114
    >>> uniq([1, 2, 3, 1, 2])
115
    [1, 2, 3]
116
    >>> uniq((i for i in (2, 10, 3, 2, 5, 1, 7, 3)))
117
    [1, 2, 3, 5, 7, 10]
118
    >>> uniq(({str(i): i} for i in (2, 10, 3, 2, 5, 1, 7, 3)),
119
    ...      key=lambda d: int(list(d.keys())[0]))
120
    [{'1': 1}, {'2': 2}, {'3': 3}, {'5': 5}, {'7': 7}, {'10': 10}]
121
    """
122
    return [t[0] for t in itertools.groupby(sorted(iterable, **kwopts))]
123
124
125
def is_parser(obj):
126
    """
127
    :return: True if given `obj` is an instance of parser.
128
129
    >>> is_parser("ini")
130
    False
131
    >>> is_parser(anyconfig.backend.base.Parser)
132
    False
133
    >>> is_parser(anyconfig.backend.base.Parser())
134
    True
135
    """
136
    return isinstance(obj, anyconfig.backend.base.Parser)
137
138
139
def _list_parsers_by_type(cps):
140
    """
141
    :param cps: A list of parser classes
142
    :return: List (generator) of (config_type, [config_parser])
143
    """
144
    return ((t, sorted(p, key=operator.methodcaller("priority"))) for t, p
145
            in groupby_key(cps, operator.methodcaller("type")))
146
147
148
def _list_xppairs(xps):
149
    """List config parsers by priority.
150
    """
151
    return sorted((snd(xp) for xp in xps),
152
                  key=operator.methodcaller("priority"))
153
154
155
def _list_parsers_by_extension(cps):
156
    """
157
    :param cps: A list of parser classes
158
    :return: List (generator) of (config_ext, [config_parser])
159
    """
160
    cps_by_ext = anyconfig.utils.concat(([(x, p) for x in p.extensions()] for p
161
                                         in cps))
162
163
    return ((x, _list_xppairs(xps)) for x, xps in groupby_key(cps_by_ext, fst))
164
165
166
def find_by_file(path_or_stream, cps=None, is_path_=False):
167
    """
168
    Find config parser by the extension of file `path_or_stream`, file path or
169
    stream (a file or file-like objects).
170
171
    :param path_or_stream: Config file path or file/file-like object
172
    :param cps: A list of pairs :: (type, parser_class)
173
    :param is_path_: True if given `path_or_stream` is a file path
174
175
    :return: Config Parser class found
176
177
    >>> find_by_file("a.missing_cnf_ext") is None
178
    True
179
    >>> strm = anyconfig.compat.StringIO()
180
    >>> find_by_file(strm) is None
181
    True
182
    >>> find_by_file("a.json")
183
    <class 'anyconfig.backend.json.Parser'>
184
    >>> find_by_file("a.json", is_path_=True)
185
    <class 'anyconfig.backend.json.Parser'>
186
    """
187
    if cps is None:
188
        cps = PARSERS
189
190
    if not is_path_ and not anyconfig.utils.is_path(path_or_stream):
191
        path_or_stream = anyconfig.utils.get_path_from_stream(path_or_stream)
192
        if path_or_stream is None:
193
            return None  # There is no way to detect file path.
194
195
    ext_ref = anyconfig.utils.get_file_extension(path_or_stream)
196
    for ext, psrs in _list_parsers_by_extension(cps):
197
        if ext == ext_ref:
198
            return psrs[-1]
199
200
    return None
201
202
203
def find_by_type(cptype, cps=None):
204
    """
205
    Find config parser by file's extension.
206
207
    :param cptype: Config file's type
208
    :param cps: A list of pairs :: (type, parser_class)
209
    :return: Config Parser class found
210
211
    >>> find_by_type("missing_type") is None
212
    True
213
    """
214
    if cps is None:
215
        cps = PARSERS
216
217
    for type_, psrs in _list_parsers_by_type(cps):
218
        if type_ == cptype:
219
            return psrs[-1] or None
220
221
    return None
222
223
224
def find_parser(path_or_stream, forced_type=None, is_path_=False):
225
    """
226
    Find out config parser object appropriate to load from a file of given path
227
    or file/file-like object.
228
229
    :param path_or_stream: Configuration file path or file / file-like object
230
    :param forced_type: Forced configuration parser type
231
    :param is_path_: True if given `path_or_stream` is a file path
232
233
    :return: A tuple of (Parser class or None, "" or error message)
234
235
    >>> find_parser(None)
236
    Traceback (most recent call last):
237
    ValueError: path_or_stream or forced_type must be some value
238
    >>> find_parser(None, "type_not_exist")
239
    Traceback (most recent call last):
240
    ValueError: No parser found for type 'type_not_exist'
241
    >>> find_parser("cnf.ext_not_found")
242
    Traceback (most recent call last):
243
    ValueError: No parser found for file 'cnf.ext_not_found'
244
245
    >>> find_parser(None, "ini")
246
    <class 'anyconfig.backend.ini.Parser'>
247
    >>> find_parser("cnf.json")
248
    <class 'anyconfig.backend.json.Parser'>
249
    >>> find_parser("cnf.json", is_path_=True)
250
    <class 'anyconfig.backend.json.Parser'>
251
    """
252
    if not path_or_stream and forced_type is None:
253
        raise ValueError("path_or_stream or forced_type must be some value")
254
255
    if forced_type is not None:
256
        parser = find_by_type(forced_type)
257
        if parser is None:
258
            raise ValueError("No parser found for type '%s'" % forced_type)
259
    else:
260
        parser = find_by_file(path_or_stream, is_path_=is_path_)
261
        if parser is None:
262
            raise ValueError("No parser found for file '%s'" % path_or_stream)
263
264
    return parser
265
266
267
def list_types(cps=None):
268
    """List available config types.
269
    """
270
    if cps is None:
271
        cps = PARSERS
272
273
    return uniq(t for t, ps in _list_parsers_by_type(cps))
274
275
# vim:sw=4:ts=4:et:
276