Completed
Push — master ( de1267...5c7fcc )
by Satoru
01:07
created

uniq()   A

Complexity

Conditions 2

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 20
rs 9.4285
c 0
b 0
f 0
1
#
2
# Copyright (C) 2011 - 2016 Satoru SATOH <ssato @ redhat.com>
3
# License: MIT
4
#
5
# Suppress:
6
# - false-positive warn at '... pkg_resources ...' line
7
# - import positions after some globals are defined
8
# pylint: disable=no-member,wrong-import-position
9
"""A module to aggregate config parser (loader/dumper) backends.
10
"""
11
from __future__ import absolute_import
12
13
import itertools
14
import logging
15
import operator
16
import pkg_resources
17
18
import anyconfig.compat
19
import anyconfig.utils
20
21
import anyconfig.backend.base
22
import anyconfig.backend.ini
23
import anyconfig.backend.json
24
import anyconfig.backend.properties
25
import anyconfig.backend.shellvars
26
import anyconfig.backend.xml
27
28
LOGGER = logging.getLogger(__name__)
29
PARSERS = [anyconfig.backend.ini.Parser, anyconfig.backend.json.Parser,
30
           anyconfig.backend.properties.Parser,
31
           anyconfig.backend.shellvars.Parser, anyconfig.backend.xml.Parser]
32
33
_NA_MSG = "%s is not available. Disabled %s support."
34
35
try:
36
    import anyconfig.backend.yaml
37
    PARSERS.append(anyconfig.backend.yaml.Parser)
38
except ImportError:
39
    LOGGER.info(_NA_MSG, "yaml module", "YAML")
40
41
try:
42
    import anyconfig.backend.configobj
43
    PARSERS.append(anyconfig.backend.configobj.Parser)
44
except ImportError:
45
    LOGGER.info(_NA_MSG, "ConfigObj module", "its")
46
47
try:
48
    import anyconfig.backend.msgpack
49
    PARSERS.append(anyconfig.backend.msgpack.Parser)
50
except ImportError:
51
    LOGGER.info(_NA_MSG, "msgpack module", "MessagePack")
52
53
try:
54
    import anyconfig.backend.toml
55
    PARSERS.append(anyconfig.backend.toml.Parser)
56
except ImportError:
57
    LOGGER.info(_NA_MSG, "toml module", "TOML")
58
59
try:
60
    import anyconfig.backend.bson
61
    PARSERS.append(anyconfig.backend.bson.Parser)
62
except ImportError:
63
    LOGGER.info(_NA_MSG, "bson module in pymongo package", "BSON")
64
65
try:
66
    import anyconfig.backend.cbor
67
    PARSERS.append(anyconfig.backend.cbor.Parser)
68
except ImportError:
69
    LOGGER.info(_NA_MSG, "cbor module in cbor package", "CBOR")
70
71
for e in pkg_resources.iter_entry_points("anyconfig_backends"):
72
    try:
73
        PARSERS.append(e.load())
74
    except ImportError:
75
        continue
76
77
78
class UnknownParserTypeError(RuntimeError):
79
    """Raise if no parsers were found for given type."""
80
    def __init__(self, forced_type):
81
        msg = "No parser found for type '%s'" % forced_type
82
        super(UnknownParserTypeError, self).__init__(msg)
83
84
85
class UnknownFileTypeError(RuntimeError):
86
    """Raise if not parsers were found for given file path."""
87
    def __init__(self, path):
88
        msg = "No parser found for file '%s'" % path
89
        super(UnknownFileTypeError, self).__init__(msg)
90
91
92
def fst(tpl):
93
    """
94
    >>> fst((0, 1))
95
    0
96
    """
97
    return tpl[0]
98
99
100
def snd(tpl):
101
    """
102
    >>> snd((0, 1))
103
    1
104
    """
105
    return tpl[1]
106
107
108
def groupby_key(itr, keyfunc):
109
    """
110
    An wrapper function around itertools.groupby
111
112
    :param itr: Iterable object, a list/tuple/genrator, etc.
113
    :param keyfunc: Key function to sort `itr`.
114
115
    >>> itr = [("a", 1), ("b", -1), ("c", 1)]
116
    >>> res = groupby_key(itr, operator.itemgetter(1))
117
    >>> [(key, tuple(grp)) for key, grp in res]
118
    [(-1, (('b', -1),)), (1, (('a', 1), ('c', 1)))]
119
    """
120
    return itertools.groupby(sorted(itr, key=keyfunc), key=keyfunc)
121
122
123
def is_parser(obj):
124
    """
125
    :return: True if given `obj` is an instance of parser.
126
127
    >>> is_parser("ini")
128
    False
129
    >>> is_parser(anyconfig.backend.base.Parser)
130
    False
131
    >>> is_parser(anyconfig.backend.base.Parser())
132
    True
133
    """
134
    return isinstance(obj, anyconfig.backend.base.Parser)
135
136
137
def _list_parsers_by_type(cps):
138
    """
139
    :param cps: A list of parser classes
140
    :return: List (generator) of (config_type, [config_parser])
141
    """
142
    return ((t, sorted(p, key=operator.methodcaller("priority"))) for t, p
143
            in groupby_key(cps, operator.methodcaller("type")))
144
145
146
def _list_xppairs(xps):
147
    """List config parsers by priority.
148
    """
149
    return sorted((snd(xp) for xp in xps),
150
                  key=operator.methodcaller("priority"))
151
152
153
def _list_parsers_by_extension(cps):
154
    """
155
    :param cps: A list of parser classes
156
    :return: List (generator) of (config_ext, [config_parser])
157
    """
158
    cps_by_ext = anyconfig.utils.concat(([(x, p) for x in p.extensions()] for p
159
                                         in cps))
160
161
    return ((x, _list_xppairs(xps)) for x, xps in groupby_key(cps_by_ext, fst))
162
163
164
_PARSERS_BY_TYPE = tuple(_list_parsers_by_type(PARSERS))
165
_PARSERS_BY_EXT = tuple(_list_parsers_by_extension(PARSERS))
166
167
168
def find_by_file(path_or_stream, cps=_PARSERS_BY_EXT, is_path_=False):
169
    """
170
    Find config parser by the extension of file `path_or_stream`, file path or
171
    stream (a file or file-like objects).
172
173
    :param path_or_stream: Config file path or file/file-like object
174
    :param cps:
175
        A tuple of pairs of (type, parser_class) or None if you want to compute
176
        this value dynamically.
177
    :param is_path_: True if given `path_or_stream` is a file path
178
179
    :return: Config Parser class found
180
181
    >>> find_by_file("a.missing_cnf_ext") is None
182
    True
183
    >>> strm = anyconfig.compat.StringIO()
184
    >>> find_by_file(strm) is None
185
    True
186
    >>> find_by_file("a.json")
187
    <class 'anyconfig.backend.json.Parser'>
188
    >>> find_by_file("a.json", is_path_=True)
189
    <class 'anyconfig.backend.json.Parser'>
190
    """
191
    if cps is None:
192
        cps = _list_parsers_by_extension(PARSERS)
193
194
    if not is_path_ and not anyconfig.utils.is_path(path_or_stream):
195
        path_or_stream = anyconfig.utils.get_path_from_stream(path_or_stream)
196
        if path_or_stream is None:
197
            return None  # There is no way to detect file path.
198
199
    ext_ref = anyconfig.utils.get_file_extension(path_or_stream)
200
    return next((psrs[-1] for ext, psrs in cps if ext == ext_ref), None)
201
202
203
def find_by_type(cptype, cps=_PARSERS_BY_TYPE):
204
    """
205
    Find config parser by file's extension.
206
207
    :param cptype: Config file's type
208
    :param cps:
209
        A list of pairs (type, parser_class) or None if you want to compute
210
        this value dynamically.
211
212
    :return: Config Parser class found
213
214
    >>> find_by_type("missing_type") is None
215
    True
216
    """
217
    if cps is None:
218
        cps = _list_parsers_by_type(PARSERS)
219
220
    return next((psrs[-1] or None for t, psrs in cps if t == cptype), None)
221
222
223
def find_parser(path_or_stream, forced_type=None, is_path_=False):
224
    """
225
    Find out config parser object appropriate to load from a file of given path
226
    or file/file-like object.
227
228
    :param path_or_stream: Configuration file path or file / file-like object
229
    :param forced_type: Forced configuration parser type
230
    :param is_path_: True if given `path_or_stream` is a file path
231
232
    :return: A tuple of (Parser class or None, "" or error message)
233
234
    >>> find_parser(None)  # doctest: +IGNORE_EXCEPTION_DETAIL
235
    Traceback (most recent call last):
236
    ValueError: path_or_stream or forced_type must be some value
237
    >>> find_parser(None, "type_not_exist"
238
    ...             )  # doctest: +IGNORE_EXCEPTION_DETAIL
239
    Traceback (most recent call last):
240
    UnknownParserTypeError: No parser found for type 'type_not_exist'
241
    >>> find_parser("cnf.ext_not_found"
242
    ...             )  # doctest: +IGNORE_EXCEPTION_DETAIL
243
    Traceback (most recent call last):
244
    UnknownFileTypeError: No parser found for file 'cnf.ext_not_found'
245
246
    >>> find_parser(None, "ini")
247
    <class 'anyconfig.backend.ini.Parser'>
248
    >>> find_parser("cnf.json")
249
    <class 'anyconfig.backend.json.Parser'>
250
    >>> find_parser("cnf.json", is_path_=True)
251
    <class 'anyconfig.backend.json.Parser'>
252
    """
253
    if not path_or_stream and forced_type is None:
254
        raise ValueError("path_or_stream or forced_type must be some value")
255
256
    if forced_type is not None:
257
        parser = find_by_type(forced_type)
258
        if parser is None:
259
            raise UnknownParserTypeError(forced_type)
260
    else:
261
        parser = find_by_file(path_or_stream, is_path_=is_path_)
262
        if parser is None:
263
            raise UnknownFileTypeError(path_or_stream)
264
265
    return parser
266
267
268
def list_types(cps=_PARSERS_BY_TYPE):
269
    """List available config types.
270
    """
271
    if cps is None:
272
        cps = _list_parsers_by_type(PARSERS)
273
274
    return sorted(set(next(anyconfig.compat.izip(*cps))))
275
276
# vim:sw=4:ts=4:et:
277