Completed
Push — master ( 6ef2f5...b75cb1 )
by Satoru
53s
created

find_by_file()   C

Complexity

Conditions 7

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 33
rs 5.5

1 Method

Rating   Name   Duplication   Size   Complexity  
A find_parser_by_type() 0 19 3
1
#
2
# Copyright (C) 2011 - 2018 Satoru SATOH <ssato @ redhat.com>
3
# License: MIT
4
#
5
# Suppress:
6
# - false-positive warn at '... pkg_resources ...' line
7
# - import positions after some globals are defined
8
# pylint: disable=no-member,wrong-import-position
9
"""A module to aggregate config parser (loader/dumper) backends.
10
"""
11
from __future__ import absolute_import
12
13
import itertools
14
import logging
15
import operator
16
import pkg_resources
17
18
import anyconfig.compat
19
import anyconfig.ioinfo
20
import anyconfig.utils
21
22
import anyconfig.backend.base
23
import anyconfig.backend.ini
24
import anyconfig.backend.json
25
import anyconfig.backend.pickle
26
import anyconfig.backend.properties
27
import anyconfig.backend.shellvars
28
import anyconfig.backend.xml
29
30
31
LOGGER = logging.getLogger(__name__)
32
PARSERS = [anyconfig.backend.ini.Parser, anyconfig.backend.json.Parser,
33
           anyconfig.backend.pickle.Parser,
34
           anyconfig.backend.properties.Parser,
35
           anyconfig.backend.shellvars.Parser, anyconfig.backend.xml.Parser]
36
37
_NA_MSG = "%s is not available. Disabled %s support."
38
39
try:
40
    import anyconfig.backend.yaml
41
    PARSERS.append(anyconfig.backend.yaml.Parser)
42
except ImportError:
43
    LOGGER.info(_NA_MSG, "yaml module", "YAML")
44
45
try:
46
    import anyconfig.backend.configobj
47
    PARSERS.append(anyconfig.backend.configobj.Parser)
48
except ImportError:
49
    LOGGER.info(_NA_MSG, "ConfigObj module", "its")
50
51
try:
52
    import anyconfig.backend.toml
53
    PARSERS.append(anyconfig.backend.toml.Parser)
54
except ImportError:
55
    LOGGER.info(_NA_MSG, "toml module", "TOML")
56
57
for e in pkg_resources.iter_entry_points("anyconfig_backends"):
58
    try:
59
        PARSERS.append(e.load())
60
    except ImportError:
61
        continue
62
63
64
def fst(tpl):
65
    """
66
    >>> fst((0, 1))
67
    0
68
    """
69
    return tpl[0]
70
71
72
def snd(tpl):
73
    """
74
    >>> snd((0, 1))
75
    1
76
    """
77
    return tpl[1]
78
79
80
def groupby_key(itr, keyfunc):
81
    """
82
    An wrapper function around itertools.groupby
83
84
    :param itr: Iterable object, a list/tuple/genrator, etc.
85
    :param keyfunc: Key function to sort `itr`.
86
87
    >>> itr = [("a", 1), ("b", -1), ("c", 1)]
88
    >>> res = groupby_key(itr, operator.itemgetter(1))
89
    >>> [(key, tuple(grp)) for key, grp in res]
90
    [(-1, (('b', -1),)), (1, (('a', 1), ('c', 1)))]
91
    """
92
    return itertools.groupby(sorted(itr, key=keyfunc), key=keyfunc)
93
94
95
def is_parser(obj):
96
    """
97
    :return: True if given `obj` is an instance of parser.
98
99
    >>> is_parser("ini")
100
    False
101
    >>> is_parser(anyconfig.backend.base.Parser)
102
    False
103
    >>> is_parser(anyconfig.backend.base.Parser())
104
    True
105
    """
106
    return isinstance(obj, anyconfig.backend.base.Parser)
107
108
109
def _list_parsers_by_type(cps):
110
    """
111
    :param cps: A list of parser classes
112
    :return: List (generator) of (config_type, [config_parser])
113
    """
114
    return ((t, sorted(p, key=operator.methodcaller("priority"))) for t, p
115
            in groupby_key(cps, operator.methodcaller("type")))
116
117
118
def _list_xppairs(xps):
119
    """List config parsers by priority.
120
    """
121
    return sorted((snd(xp) for xp in xps),
122
                  key=operator.methodcaller("priority"))
123
124
125
def _list_parsers_by_extension(cps):
126
    """
127
    :param cps: A list of parser classes
128
    :return: List (generator) of (config_ext, [config_parser])
129
    """
130
    cps_by_ext = anyconfig.utils.concat(([(x, p) for x in p.extensions()] for p
131
                                         in cps))
132
133
    return ((x, _list_xppairs(xps)) for x, xps in groupby_key(cps_by_ext, fst))
134
135
136
_PARSERS_BY_TYPE = tuple(_list_parsers_by_type(PARSERS))
137
_PARSERS_BY_EXT = tuple(_list_parsers_by_extension(PARSERS))
138
139
140
def inspect_io_obj(obj, cps_by_ext=_PARSERS_BY_EXT,
141
                   cps_by_type=_PARSERS_BY_TYPE, forced_type=None):
142
    """
143
    Inspect a given object `obj` which may be a path string, file / file-like
144
    object, pathlib.Path object or `~anyconfig.globals.IOInfo` namedtuple
145
    object, and find out appropriate parser object to load or dump from/to it
146
    along with other I/O information.
147
148
    :param obj:
149
        a file path string, file / file-like object, pathlib.Path object or
150
        `~anyconfig.globals.IOInfo` object
151
    :param forced_type: Forced type of parser to load or dump
152
153
    :return: anyconfig.globals.IOInfo object :: namedtuple
154
    :raises: ValueError, UnknownParserTypeError, UnknownFileTypeError
155
    """
156
    return anyconfig.ioinfo.make(obj, cps_by_ext, cps_by_type,
157
                                 forced_type=forced_type)
158
159
160
def find_parser_by_type(forced_type, cps_by_ext=_PARSERS_BY_EXT,
161
                        cps_by_type=_PARSERS_BY_TYPE):
162
    """
163
    Find out appropriate parser object to load inputs of given type.
164
165
    :param forced_type: Forced parser type
166
    :param cps_by_type: A list of pairs (parser_type, [parser_class])
167
168
    :return:
169
        An instance of :class:`~anyconfig.backend.base.Parser` or None means no
170
        appropriate parser was found
171
    :raises: UnknownParserTypeError
172
    """
173
    if forced_type is None or not forced_type:
174
        raise ValueError("forced_type must be a some string")
175
176
    return anyconfig.ioinfo.find_parser(None, cps_by_ext=cps_by_ext,
177
                                        cps_by_type=cps_by_type,
178
                                        forced_type=forced_type)
179
180
181
def find_parser(obj, forced_type=None):
182
    """
183
    Find out appropriate parser object to load from a file of given path or
184
    file/file-like object.
185
186
    :param obj:
187
        a file path string, file / file-like object, pathlib.Path object or
188
        `~anyconfig.globals.IOInfo` object
189
    :param forced_type: Forced configuration parser type
190
191
    :return: A tuple of (Parser class or None, "" or error message)
192
    :raises: ValueError, UnknownParserTypeError, UnknownFileTypeError
193
    """
194
    if anyconfig.utils.is_ioinfo(obj):
195
        return obj.parser  # It must have this.
196
197
    ioi = inspect_io_obj(obj, _PARSERS_BY_EXT, _PARSERS_BY_TYPE, forced_type)
198
    psr = ioi.parser
199
    LOGGER.debug("Using parser %r [%s] for input type %s",
200
                 psr, psr.type(), ioi.type)
201
    return psr
202
203
204
def list_types(cps=_PARSERS_BY_TYPE):
205
    """List available config types.
206
    """
207
    if cps is None:
208
        cps = _list_parsers_by_type(PARSERS)
209
210
    return sorted(set(next(anyconfig.compat.izip(*cps))))
211
212
# vim:sw=4:ts=4:et:
213