Completed
Push — file-actions ( 9b9199...28a134 )
by Felipe A.
33s
created

Barrier._uncork()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 2
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import os
5
import os.path
6
import sys
7
import abc
8
import itertools
9
10
import warnings
11
import functools
12
13
import posixpath
14
import ntpath
15
16
FS_ENCODING = sys.getfilesystemencoding()
17
PY_LEGACY = sys.version_info < (3, )
18
TRUE_VALUES = frozenset(('true', 'yes', '1', 'enable', 'enabled', True, 1))
19
20
try:
21
    from os import scandir, walk  # python 3.5+
22
except ImportError:
23
    from scandir import scandir, walk  # noqa
24
25
try:
26
    from shutil import get_terminal_size  # python 3.3+
27
except ImportError:
28
    from backports.shutil_get_terminal_size import get_terminal_size  # noqa
29
30
try:
31
    from queue import Queue, Empty, Full  # python 3
32
except ImportError:
33
    from Queue import Queue, Empty, Full  # noqa
34
35
try:
36
    from collections.abc import Generator  # python 3.3+
37
except ImportError:
38
    from backports_abc import Generator  # noqa
39
40
41
def isexec(path):
42
    '''
43
    Check if given path points to an executable file.
44
45
    :param path: file path
46
    :type path: str
47
    :return: True if executable, False otherwise
48
    :rtype: bool
49
    '''
50
    return os.path.isfile(path) and os.access(path, os.X_OK)
51
52
53
def fsdecode(path, os_name=os.name, fs_encoding=FS_ENCODING, errors=None):
54
    '''
55
    Decode given path.
56
57
    :param path: path will be decoded if using bytes
58
    :type path: bytes or str
59
    :param os_name: operative system name, defaults to os.name
60
    :type os_name: str
61
    :param fs_encoding: current filesystem encoding, defaults to autodetected
62
    :type fs_encoding: str
63
    :return: decoded path
64
    :rtype: str
65
    '''
66
    if not isinstance(path, bytes):
67
        return path
68
    if not errors:
69
        use_strict = PY_LEGACY or os_name == 'nt'
70
        errors = 'strict' if use_strict else 'surrogateescape'
71
    return path.decode(fs_encoding, errors=errors)
72
73
74
def fsencode(path, os_name=os.name, fs_encoding=FS_ENCODING, errors=None):
75
    '''
76
    Encode given path.
77
78
    :param path: path will be encoded if not using bytes
79
    :type path: bytes or str
80
    :param os_name: operative system name, defaults to os.name
81
    :type os_name: str
82
    :param fs_encoding: current filesystem encoding, defaults to autodetected
83
    :type fs_encoding: str
84
    :return: encoded path
85
    :rtype: bytes
86
    '''
87
    if isinstance(path, bytes):
88
        return path
89
    if not errors:
90
        use_strict = PY_LEGACY or os_name == 'nt'
91
        errors = 'strict' if use_strict else 'surrogateescape'
92
    return path.encode(fs_encoding, errors=errors)
93
94
95
def getcwd(fs_encoding=FS_ENCODING, cwd_fnc=os.getcwd):
96
    '''
97
    Get current work directory's absolute path.
98
    Like os.getcwd but garanteed to return an unicode-str object.
99
100
    :param fs_encoding: filesystem encoding, defaults to autodetected
101
    :type fs_encoding: str
102
    :param cwd_fnc: callable used to get the path, defaults to os.getcwd
103
    :type cwd_fnc: Callable
104
    :return: path
105
    :rtype: str
106
    '''
107
    path = fsdecode(cwd_fnc(), fs_encoding=fs_encoding)
108
    return os.path.abspath(path)
109
110
111
def getdebug(environ=os.environ, true_values=TRUE_VALUES):
112
    '''
113
    Get if app is expected to be ran in debug mode looking at environment
114
    variables.
115
116
    :param environ: environment dict-like object
117
    :type environ: collections.abc.Mapping
118
    :returns: True if debug contains a true-like string, False otherwise
119
    :rtype: bool
120
    '''
121
    return environ.get('DEBUG', '').lower() in true_values
122
123
124
def deprecated(func_or_text, environ=os.environ):
125
    '''
126
    Decorator used to mark functions as deprecated. It will result in a
127
    warning being emmitted hen the function is called.
128
129
    Usage:
130
131
    >>> @deprecated
132
    ... def fnc():
133
    ...     pass
134
135
    Usage (custom message):
136
137
    >>> @deprecated('This is deprecated')
138
    ... def fnc():
139
    ...     pass
140
141
    :param func_or_text: message or callable to decorate
142
    :type func_or_text: callable
143
    :param environ: optional environment mapping
144
    :type environ: collections.abc.Mapping
145
    :returns: nested decorator or new decorated function (depending on params)
146
    :rtype: callable
147
    '''
148
    def inner(func):
149
        message = (
150
            'Deprecated function {}.'.format(func.__name__)
151
            if callable(func_or_text) else
152
            func_or_text
153
            )
154
155
        @functools.wraps(func)
156
        def new_func(*args, **kwargs):
157
            with warnings.catch_warnings():
158
                if getdebug(environ):
159
                    warnings.simplefilter('always', DeprecationWarning)
160
                warnings.warn(message, category=DeprecationWarning,
161
                              stacklevel=3)
162
            return func(*args, **kwargs)
163
        return new_func
164
    return inner(func_or_text) if callable(func_or_text) else inner
165
166
167
def usedoc(other):
168
    '''
169
    Decorator which copies __doc__ of given object into decorated one.
170
171
    Usage:
172
173
    >>> def fnc1():
174
    ...     """docstring"""
175
    ...     pass
176
    >>> @usedoc(fnc1)
177
    ... def fnc2():
178
    ...     pass
179
    >>> fnc2.__doc__
180
    'docstring'collections.abc.D
181
182
    :param other: anything with a __doc__ attribute
183
    :type other: any
184
    :returns: decorator function
185
    :rtype: callable
186
    '''
187
    def inner(fnc):
188
        fnc.__doc__ = fnc.__doc__ or getattr(other, '__doc__')
189
        return fnc
190
    return inner
191
192
193
def pathsplit(value, sep=os.pathsep):
194
    '''
195
    Get enviroment PATH elements as list.
196
197
    This function only cares about spliting across OSes.
198
199
    :param value: path string, as given by os.environ['PATH']
200
    :type value: str
201
    :param sep: PATH separator, defaults to os.pathsep
202
    :type sep: str
203
    :yields: every path
204
    :ytype: str
205
    '''
206
    for part in value.split(sep):
207
        if part[:1] == part[-1:] == '"' or part[:1] == part[-1:] == '\'':
208
            part = part[1:-1]
209
        yield part
210
211
212
def pathparse(value, sep=os.pathsep, os_sep=os.sep):
213
    '''
214
    Get enviroment PATH directories as list.
215
216
    This function cares about spliting, escapes and normalization of paths
217
    across OSes.
218
219
    :param value: path string, as given by os.environ['PATH']
220
    :type value: str
221
    :param sep: PATH separator, defaults to os.pathsep
222
    :type sep: str
223
    :param os_sep: OS filesystem path separator, defaults to os.sep
224
    :type os_sep: str
225
    :yields: every path
226
    :ytype: str
227
    '''
228
    escapes = []
229
    normpath = ntpath.normpath if os_sep == '\\' else posixpath.normpath
230
    if '\\' not in (os_sep, sep):
231
        escapes.extend((
232
            ('\\\\', '<ESCAPE-ESCAPE>', '\\'),
233
            ('\\"', '<ESCAPE-DQUOTE>', '"'),
234
            ('\\\'', '<ESCAPE-SQUOTE>', '\''),
235
            ('\\%s' % sep, '<ESCAPE-PATHSEP>', sep),
236
            ))
237
    for original, escape, unescape in escapes:
238
        value = value.replace(original, escape)
239
    for part in pathsplit(value, sep=sep):
240
        if part[-1:] == os_sep and part != os_sep:
241
            part = part[:-1]
242
        for original, escape, unescape in escapes:
243
            part = part.replace(escape, unescape)
244
        yield normpath(fsdecode(part))
245
246
247
def pathconf(path,
248
             os_name=os.name,
249
             isdir_fnc=os.path.isdir,
250
             pathconf_fnc=getattr(os, 'pathconf', None),
251
             pathconf_names=getattr(os, 'pathconf_names', ())):
252
    '''
253
    Get all pathconf variables for given path.
254
255
    :param path: absolute fs path
256
    :type path: str
257
    :returns: dictionary containing pathconf keys and their values (both str)
258
    :rtype: dict
259
    '''
260
261
    if pathconf_fnc and pathconf_names:
262
        return {key: pathconf_fnc(path, key) for key in pathconf_names}
263
    if os_name == 'nt':
264
        maxpath = 246 if isdir_fnc(path) else 259  # 260 minus <END>
265
    else:
266
        maxpath = 255  # conservative sane default
267
    return {
268
        'PC_PATH_MAX': maxpath,
269
        'PC_NAME_MAX': maxpath - len(path),
270
        }
271
272
273
ENV_PATH = tuple(pathparse(os.getenv('PATH', '')))
274
ENV_PATHEXT = tuple(pathsplit(os.getenv('PATHEXT', '')))
275
276
277
def which(name,
278
          env_path=ENV_PATH,
279
          env_path_ext=ENV_PATHEXT,
280
          is_executable_fnc=isexec,
281
          path_join_fnc=os.path.join,
282
          os_name=os.name):
283
    '''
284
    Get command absolute path.
285
286
    :param name: name of executable command
287
    :type name: str
288
    :param env_path: OS environment executable paths, defaults to autodetected
289
    :type env_path: list of str
290
    :param is_executable_fnc: callable will be used to detect if path is
291
                              executable, defaults to `isexec`
292
    :type is_executable_fnc: Callable
293
    :param path_join_fnc: callable will be used to join path components
294
    :type path_join_fnc: Callable
295
    :param os_name: os name, defaults to os.name
296
    :type os_name: str
297
    :return: absolute path
298
    :rtype: str or None
299
    '''
300
    for path in env_path:
301
        for suffix in env_path_ext:
302
            exe_file = path_join_fnc(path, name) + suffix
303
            if is_executable_fnc(exe_file):
304
                return exe_file
305
    return None
306
307
308
def re_escape(pattern, chars=frozenset("()[]{}?*+|^$\\.-#")):
309
    '''
310
    Escape all special regex characters in pattern and converts non-ascii
311
    characters into unicode escape sequences.
312
313
    Logic taken from regex module.
314
315
    :param pattern: regex pattern to escape
316
    :type patterm: str
317
    :returns: escaped pattern
318
    :rtype: str
319
    '''
320
    chr_escape = '\\{}'.format
321
    uni_escape = '\\u{:04d}'.format
322
    return ''.join(
323
        chr_escape(c) if c in chars or c.isspace() else
324
        c if '\x19' < c < '\x80' else
325
        uni_escape(ord(c))
326
        for c in pattern
327
        )
328
329
330
if PY_LEGACY:
331
332
    class FileNotFoundError(BaseException):
333
        __metaclass__ = abc.ABCMeta
334
335
    FileNotFoundError.register(OSError)
336
    FileNotFoundError.register(IOError)
337
338
    range = xrange  # noqa
339
    filter = itertools.ifilter
340
    map = itertools.imap
341
    basestring = basestring  # noqa
342
    unicode = unicode  # noqa
343
    chr = unichr  # noqa
344
    bytes = str  # noqa
345
else:
346
    FileNotFoundError = FileNotFoundError
347
    range = range
348
    filter = filter
349
    map = map
350
    basestring = str
351
    unicode = str
352
    chr = chr
353
    bytes = bytes
354