1
|
|
|
# |
2
|
|
|
# Copyright (C) 2018 Satoru SATOH <ssato @ redhat.com> |
3
|
|
|
# License: MIT |
4
|
|
|
# |
5
|
|
|
# pylint: disable=invalid-name |
6
|
|
|
r"""Functions for value objects represent inputs and outputs. |
7
|
|
|
|
8
|
|
|
.. versionadded:: 0.9.5 |
9
|
|
|
|
10
|
|
|
- Add functions to make and process input and output object holding some |
11
|
|
|
attributes like input and output type (path, stream or pathlib.Path object), |
12
|
|
|
path, opener, etc. |
13
|
|
|
""" |
14
|
|
|
from __future__ import absolute_import |
15
|
|
|
|
16
|
|
|
import anyconfig.utils |
17
|
|
|
|
18
|
|
|
from anyconfig.globals import ( |
19
|
|
|
IOInfo, IOI_NONE, IOI_PATH_STR, IOI_PATH_OBJ, IOI_STREAM, |
20
|
|
|
UnknownFileTypeError, UnknownParserTypeError |
21
|
|
|
) |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
def guess_io_type(obj): |
25
|
|
|
"""Guess input or output type of ``obj``. |
26
|
|
|
|
27
|
|
|
:param obj: a path string, a pathlib.Path or a file / file-like object |
28
|
|
|
:return: IOInfo type defined in anyconfig.globals.IOI_TYPES |
29
|
|
|
|
30
|
|
|
>>> apath = "/path/to/a_conf.ext" |
31
|
|
|
>>> assert guess_io_type(apath) == IOI_PATH_STR |
32
|
|
|
|
33
|
|
|
>>> from anyconfig.compat import pathlib |
34
|
|
|
>>> if pathlib is not None: |
35
|
|
|
... assert guess_io_type(pathlib.Path(apath)) == IOI_PATH_OBJ |
36
|
|
|
>>> assert guess_io_type(open(__file__)) == IOI_STREAM |
37
|
|
|
""" |
38
|
|
|
if obj is None: |
39
|
|
|
return IOI_NONE |
40
|
|
|
elif anyconfig.utils.is_path(obj): |
41
|
|
|
return IOI_PATH_STR |
42
|
|
|
elif anyconfig.utils.is_path_obj(obj): |
43
|
|
|
return IOI_PATH_OBJ |
44
|
|
|
|
45
|
|
|
return IOI_STREAM |
46
|
|
|
|
47
|
|
|
|
48
|
|
|
def inspect_io_obj(obj): |
49
|
|
|
""" |
50
|
|
|
:param obj: a path string, a pathlib.Path or a file / file-like object |
51
|
|
|
|
52
|
|
|
:return: A tuple of (objtype, objpath, objopener) |
53
|
|
|
:raises: UnknownFileTypeError |
54
|
|
|
""" |
55
|
|
|
itype = guess_io_type(obj) |
56
|
|
|
|
57
|
|
|
if itype == IOI_PATH_STR: |
58
|
|
|
ipath = anyconfig.utils.normpath(obj) |
59
|
|
|
opener = open |
60
|
|
|
elif itype == IOI_PATH_OBJ: |
61
|
|
|
ipath = anyconfig.utils.normpath(obj.as_posix()) |
62
|
|
|
opener = obj.open |
63
|
|
|
elif itype == IOI_STREAM: |
64
|
|
|
ipath = anyconfig.utils.get_path_from_stream(obj) |
65
|
|
|
opener = anyconfig.utils.noop |
66
|
|
|
elif itype == IOI_NONE: |
67
|
|
|
ipath = None |
68
|
|
|
opener = anyconfig.utils.noop |
69
|
|
|
else: |
70
|
|
|
raise UnknownFileTypeError("%r" % obj) |
71
|
|
|
|
72
|
|
|
return (itype, ipath, opener) |
73
|
|
|
|
74
|
|
|
|
75
|
|
|
def find_by_fileext(fileext, cps_by_ext): |
76
|
|
|
""" |
77
|
|
|
:param fileext: File extension |
78
|
|
|
:param cps_by_ext: A list of pairs (file_extension, [parser_class]) |
79
|
|
|
|
80
|
|
|
:return: Most appropriate parser class to process given file |
81
|
|
|
|
82
|
|
|
>>> from anyconfig.backends import _PARSERS_BY_EXT as cps |
83
|
|
|
>>> find_by_fileext("json", cps) |
84
|
|
|
<class 'anyconfig.backend.json.Parser'> |
85
|
|
|
>>> find_by_fileext("ext_should_not_be_found", cps) is None |
86
|
|
|
True |
87
|
|
|
""" |
88
|
|
|
return next((psrs[-1] for ext, psrs in cps_by_ext if ext == fileext), |
89
|
|
|
None) |
90
|
|
|
|
91
|
|
|
|
92
|
|
|
def find_by_filepath(filepath, cps_by_ext): |
93
|
|
|
""" |
94
|
|
|
:param filepath: Path to the file to find out parser to process it |
95
|
|
|
:param cps_by_ext: A list of pairs (file_extension, [parser_class]) |
96
|
|
|
|
97
|
|
|
:return: Most appropriate parser class to process given file |
98
|
|
|
|
99
|
|
|
>>> from anyconfig.backends import _PARSERS_BY_EXT as cps |
100
|
|
|
>>> find_by_filepath("/a/b/c/x.json", cps) |
101
|
|
|
<class 'anyconfig.backend.json.Parser'> |
102
|
|
|
>>> find_by_filepath("/path/to/a.ext_should_not_be_found", cps) is None |
103
|
|
|
True |
104
|
|
|
""" |
105
|
|
|
fileext = anyconfig.utils.get_file_extension(filepath) |
106
|
|
|
return find_by_fileext(fileext, cps_by_ext) |
107
|
|
|
|
108
|
|
|
|
109
|
|
|
def find_by_type(cptype, cps_by_type): |
110
|
|
|
""" |
111
|
|
|
:param cptype: Config file's type |
112
|
|
|
:param cps_by_type: A list of pairs (parser_type, [parser_class]) |
113
|
|
|
|
114
|
|
|
:return: Most appropriate parser class to process given type or None |
115
|
|
|
|
116
|
|
|
>>> from anyconfig.backends import _PARSERS_BY_TYPE as cps |
117
|
|
|
>>> find_by_type("json", cps) |
118
|
|
|
<class 'anyconfig.backend.json.Parser'> |
119
|
|
|
>>> find_by_type("missing_type", cps) is None |
120
|
|
|
True |
121
|
|
|
""" |
122
|
|
|
return next((psrs[-1] or None for t, psrs in cps_by_type if t == cptype), |
123
|
|
|
None) |
124
|
|
|
|
125
|
|
|
|
126
|
|
|
def find_parser(ipath, cps_by_ext, cps_by_type, forced_type=None): |
127
|
|
|
""" |
128
|
|
|
:param ipath: file path |
129
|
|
|
:param cps_by_ext: A list of pairs (file_extension, [parser_class]) |
130
|
|
|
:param cps_by_type: A list of pairs (parser_type, [parser_class]) |
131
|
|
|
:param forced_type: Forced configuration parser type or parser object |
132
|
|
|
|
133
|
|
|
:return: Instance of parser class appropriate for the input `ipath` |
134
|
|
|
:raises: ValueError, UnknownParserTypeError, UnknownFileTypeError |
135
|
|
|
""" |
136
|
|
|
if (ipath is None or not ipath) and forced_type is None: |
137
|
|
|
raise ValueError("ipath or forced_type must be some value") |
138
|
|
|
|
139
|
|
|
if isinstance(forced_type, anyconfig.backend.base.Parser): |
140
|
|
|
return forced_type |
141
|
|
|
|
142
|
|
|
if forced_type is None: |
143
|
|
|
parser = find_by_filepath(ipath, cps_by_ext) |
144
|
|
|
if parser is None: |
145
|
|
|
raise UnknownFileTypeError(ipath) |
146
|
|
|
|
147
|
|
|
return parser() |
148
|
|
|
|
149
|
|
|
parser = find_by_type(forced_type, cps_by_type) |
150
|
|
|
if parser is None: |
151
|
|
|
raise UnknownParserTypeError(forced_type) |
152
|
|
|
|
153
|
|
|
return parser() |
154
|
|
|
|
155
|
|
|
|
156
|
|
|
def make(obj, cps_by_ext, cps_by_type, forced_type=None): |
157
|
|
|
""" |
158
|
|
|
:param obj: a path string, a pathlib.Path or a file / file-like object |
159
|
|
|
:param cps_by_ext: A list of pairs (file_extension, [parser_class]) |
160
|
|
|
:param cps_by_type: A list of pairs (parser_type, [parser_class]) |
161
|
|
|
:param forced_type: Forced configuration parser type |
162
|
|
|
|
163
|
|
|
:return: |
164
|
|
|
Namedtuple object represents a kind of input object such as a file / |
165
|
|
|
file-like object, path string or pathlib.Path object |
166
|
|
|
|
167
|
|
|
:raises: ValueError, UnknownParserTypeError, UnknownFileTypeError |
168
|
|
|
""" |
169
|
|
|
if anyconfig.utils.is_ioinfo(obj): |
170
|
|
|
return obj |
171
|
|
|
|
172
|
|
|
if (obj is None or not obj) and forced_type is None: |
173
|
|
|
raise ValueError("obj or forced_type must be some value") |
174
|
|
|
|
175
|
|
|
(itype, ipath, opener) = inspect_io_obj(obj) |
176
|
|
|
psr = find_parser(ipath, cps_by_ext, cps_by_type, forced_type=forced_type) |
177
|
|
|
|
178
|
|
|
return IOInfo(src=obj, type=itype, path=ipath, parser=psr, opener=opener) |
179
|
|
|
|
180
|
|
|
# vim:sw=4:ts=4:et: |
181
|
|
|
|