Conditions | 6 |
Total Lines | 12 |
Code Lines | 8 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Metric | Value |
---|---|
cc | 6 |
eloc | 8 |
nop | 3 |
dl | 0 |
loc | 12 |
rs | 8.6666 |
c | 0 |
b | 0 |
f | 0 |
1 | import typing |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
2 | import os, io, shutil, gzip, platform, re |
||
0 ignored issues
–
show
|
|||
3 | from enum import Enum |
||
4 | |||
5 | from pocketutils.core.exceptions import PathIsNotADirError, MissingResourceError |
||
0 ignored issues
–
show
|
|||
6 | |||
7 | import json, sys |
||
0 ignored issues
–
show
|
|||
8 | import unicodedata |
||
0 ignored issues
–
show
|
|||
9 | from itertools import chain |
||
0 ignored issues
–
show
|
|||
10 | import signal |
||
0 ignored issues
–
show
|
|||
11 | import operator |
||
0 ignored issues
–
show
|
|||
12 | from datetime import date, datetime |
||
0 ignored issues
–
show
|
|||
13 | from typing import Callable, TypeVar, Iterator, Iterable, Optional, List, Any, Sequence, Mapping, Dict, ItemsView, KeysView, ValuesView, Tuple, Union |
||
0 ignored issues
–
show
|
|||
14 | from hurry.filesize import size as hsize |
||
0 ignored issues
–
show
|
|||
15 | |||
16 | from pocketutils.core.exceptions import LookupFailedError, MultipleMatchesError, ParsingError, ResourceError, MissingConfigKeyError, \ |
||
0 ignored issues
–
show
|
|||
17 | NaturalExpectedError, HashValidationError, FileDoesNotExistError |
||
18 | |||
19 | import logging |
||
0 ignored issues
–
show
|
|||
20 | import subprocess |
||
0 ignored issues
–
show
|
|||
21 | from subprocess import Popen, PIPE |
||
0 ignored issues
–
show
|
|||
22 | from queue import Queue |
||
0 ignored issues
–
show
|
|||
23 | from threading import Thread |
||
0 ignored issues
–
show
|
|||
24 | from deprecated import deprecated |
||
0 ignored issues
–
show
|
|||
25 | |||
26 | import datetime |
||
0 ignored issues
–
show
|
|||
27 | #from datetime import datetime |
||
28 | from subprocess import Popen, PIPE |
||
0 ignored issues
–
show
|
|||
29 | from infix import shift_infix, mod_infix, sub_infix |
||
0 ignored issues
–
show
|
|||
30 | |||
31 | from colorama import Fore |
||
0 ignored issues
–
show
|
|||
32 | import shutil |
||
0 ignored issues
–
show
|
|||
33 | import argparse |
||
0 ignored issues
–
show
|
|||
34 | |||
35 | import hashlib |
||
0 ignored issues
–
show
|
|||
36 | import codecs |
||
0 ignored issues
–
show
|
|||
37 | import gzip |
||
0 ignored issues
–
show
|
|||
38 | |||
39 | import struct |
||
0 ignored issues
–
show
|
|||
40 | from array import array |
||
0 ignored issues
–
show
|
|||
41 | import numpy as np |
||
0 ignored issues
–
show
|
|||
42 | |||
43 | def blob_to_byte_array(bytes_obj: bytes): |
||
0 ignored issues
–
show
|
|||
44 | return _blob_to_dt(bytes_obj, 'b', 1, np.ubyte) + 128 |
||
0 ignored issues
–
show
|
|||
45 | def blob_to_float_array(bytes_obj: bytes): |
||
0 ignored issues
–
show
|
|||
46 | return _blob_to_dt(bytes_obj, 'f', 4, np.float32) |
||
0 ignored issues
–
show
|
|||
47 | def blob_to_double_array(bytes_obj: bytes): |
||
0 ignored issues
–
show
|
|||
48 | return _blob_to_dt(bytes_obj, 'd', 8, np.float64) |
||
0 ignored issues
–
show
|
|||
49 | def blob_to_short_array(bytes_obj: bytes): |
||
0 ignored issues
–
show
|
|||
50 | return _blob_to_dt(bytes_obj, 'H', 2, np.int16) |
||
0 ignored issues
–
show
|
|||
51 | def blob_to_int_array(bytes_obj: bytes): |
||
0 ignored issues
–
show
|
|||
52 | return _blob_to_dt(bytes_obj, 'I', 4, np.int32) |
||
0 ignored issues
–
show
|
|||
53 | def blob_to_long_array(bytes_obj: bytes): |
||
0 ignored issues
–
show
|
|||
54 | return _blob_to_dt(bytes_obj, 'Q', 8, np.int64) |
||
0 ignored issues
–
show
|
|||
55 | def _blob_to_dt(bytes_obj: bytes, data_type_str: str, data_type_len: int, dtype): |
||
56 | return np.array(next(iter(struct.iter_unpack('>' + data_type_str * int(len(bytes_obj)/data_type_len), bytes_obj))), dtype=dtype) |
||
0 ignored issues
–
show
|
|||
57 | |||
58 | class FileHasher: |
||
0 ignored issues
–
show
|
|||
59 | |||
60 | def __init__(self, algorithm: Callable[[], Any]=hashlib.sha1, extension: str='.sha1', buffer_size: int = 16*1024): |
||
0 ignored issues
–
show
|
|||
61 | self.algorithm = algorithm |
||
0 ignored issues
–
show
|
|||
62 | self.extension = extension |
||
0 ignored issues
–
show
|
|||
63 | self.buffer_size = buffer_size |
||
0 ignored issues
–
show
|
|||
64 | |||
65 | def hashsum(self, file_name: str) -> str: |
||
0 ignored issues
–
show
|
|||
66 | alg = self.algorithm() |
||
0 ignored issues
–
show
|
|||
67 | with open(file_name, 'rb') as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
68 | for chunk in iter(lambda: f.read(self.buffer_size), b''): |
||
0 ignored issues
–
show
|
|||
69 | alg.update(chunk) |
||
0 ignored issues
–
show
|
|||
70 | return alg.hexdigest() |
||
0 ignored issues
–
show
|
|||
71 | |||
72 | def add_hash(self, file_name: str) -> None: |
||
0 ignored issues
–
show
|
|||
73 | with open(file_name + self.extension, 'w', encoding="utf8") as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
74 | s = self.hashsum(file_name) |
||
0 ignored issues
–
show
Variable name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
75 | f.write(s) |
||
0 ignored issues
–
show
|
|||
76 | |||
77 | def check_hash(self, file_name: str) -> bool: |
||
0 ignored issues
–
show
|
|||
78 | if not os.path.isfile(file_name + self.extension): return False |
||
0 ignored issues
–
show
|
|||
79 | with open(file_name + self.extension, 'r', encoding="utf8") as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
80 | hash_str = f.read().split()[0] # check only the first thing on the line before any spaces |
||
0 ignored issues
–
show
|
|||
81 | return hash_str == self.hashsum(file_name) |
||
0 ignored issues
–
show
|
|||
82 | |||
83 | def check_and_open(self, file_name: str, *args): |
||
0 ignored issues
–
show
|
|||
84 | return self._o(file_name, opener=lambda f: codecs.open(f, encoding='utf-8'), *args) |
||
0 ignored issues
–
show
|
|||
85 | |||
86 | def check_and_open_gzip(self, file_name: str, *args): |
||
0 ignored issues
–
show
|
|||
87 | return self._o(file_name, opener=gzip.open, *args) |
||
0 ignored issues
–
show
|
|||
88 | |||
89 | def _o(self, file_name: str, opener, *args): |
||
0 ignored issues
–
show
|
|||
90 | if not os.path.isfile(file_name + self.extension): |
||
0 ignored issues
–
show
|
|||
91 | raise FileDoesNotExistError("Hash for file {} does not exist".format(file_name)) |
||
0 ignored issues
–
show
|
|||
92 | with open(file_name + self.extension, 'r', encoding="utf8") as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
93 | if f.read() != self.hashsum(file_name): |
||
0 ignored issues
–
show
|
|||
94 | raise HashValidationError("Hash for file {} does not match".format(file_name)) |
||
0 ignored issues
–
show
|
|||
95 | return opener(file_name, *args) |
||
0 ignored issues
–
show
|
|||
96 | |||
97 | def mkdatetime(s: str) -> datetime: |
||
0 ignored issues
–
show
Argument name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
98 | return datetime.strptime(s.replace(' ', 'T'), "%Y-%m-%dT%H:%M:%S") |
||
0 ignored issues
–
show
|
|||
99 | |||
100 | def now() -> datetime: |
||
0 ignored issues
–
show
|
|||
101 | return datetime.datetime.now() |
||
0 ignored issues
–
show
|
|||
102 | |||
103 | def today() -> datetime: |
||
0 ignored issues
–
show
|
|||
104 | return datetime.datetime.today() |
||
0 ignored issues
–
show
|
|||
105 | |||
106 | def mkdate(s: str) -> datetime: |
||
0 ignored issues
–
show
Argument name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
107 | return datetime.strptime(s, "%Y-%m-%d") |
||
0 ignored issues
–
show
|
|||
108 | |||
109 | def this_year(s: str) -> datetime: |
||
0 ignored issues
–
show
Argument name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
110 | return datetime.strptime(s, "%Y") |
||
0 ignored issues
–
show
|
|||
111 | |||
112 | def year_range(year: int) -> Tuple[datetime.datetime, datetime.datetime]: |
||
0 ignored issues
–
show
|
|||
113 | return ( |
||
0 ignored issues
–
show
|
|||
114 | datetime(year, 1, 1, 0, 0, 0, 0), |
||
0 ignored issues
–
show
|
|||
115 | datetime(year, 12, 31, 23, 59, 59, 999) |
||
0 ignored issues
–
show
|
|||
116 | ) |
||
117 | |||
118 | @shift_infix |
||
119 | def approxeq(a, b): |
||
0 ignored issues
–
show
Argument name "a" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() Argument name "b" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
120 | """This takes 1e-09 * max(abs(a), abs(b)), which may not be appropriate.""" |
||
0 ignored issues
–
show
|
|||
121 | """Example: 5 <<approxeq>> 5.000000000000001""" |
||
0 ignored issues
–
show
|
|||
122 | return abs(a - b) < 1e-09 * max(abs(a), abs(b)) |
||
0 ignored issues
–
show
|
|||
123 | |||
124 | class TomlData: |
||
125 | """A better TOML data structure than a plain dict. |
||
0 ignored issues
–
show
|
|||
126 | Usage examples: |
||
127 | data = TomlData({'x': {'y': {'z': 155}}}) |
||
128 | print(data['x.y.z']) # prints 155 |
||
129 | data.sub('x.y') # returns a new TomlData for {'z': 155} |
||
130 | data.nested_keys() # returns all keys and sub-keys |
||
131 | """ |
||
132 | def __init__(self, top_level_item: Dict[str, object]): |
||
0 ignored issues
–
show
|
|||
133 | assert top_level_item is not None |
||
0 ignored issues
–
show
|
|||
134 | self.top = top_level_item |
||
0 ignored issues
–
show
|
|||
135 | |||
136 | def __str__(self) -> str: |
||
0 ignored issues
–
show
|
|||
137 | return repr(self) |
||
0 ignored issues
–
show
|
|||
138 | def __repr__(self) -> str: |
||
0 ignored issues
–
show
|
|||
139 | return "TomlData({})".format(str(self.top)) |
||
0 ignored issues
–
show
|
|||
140 | |||
141 | def __getitem__(self, key: str) -> Dict[str, object]: |
||
0 ignored issues
–
show
|
|||
142 | return self.sub(key).top |
||
0 ignored issues
–
show
|
|||
143 | |||
144 | def __contains__(self, key: str) -> bool: |
||
0 ignored issues
–
show
|
|||
145 | try: |
||
0 ignored issues
–
show
|
|||
146 | self.sub(key) |
||
0 ignored issues
–
show
|
|||
147 | return True |
||
0 ignored issues
–
show
|
|||
148 | except AttributeError: return False |
||
0 ignored issues
–
show
|
|||
149 | |||
150 | def get_str(self, key: str) -> str: |
||
0 ignored issues
–
show
|
|||
151 | return str(self.__as(key, str)) |
||
0 ignored issues
–
show
|
|||
152 | |||
153 | def get_int(self, key: str) -> int: |
||
0 ignored issues
–
show
|
|||
154 | # noinspection PyTypeChecker |
||
155 | return int(self.__as(key, int)) |
||
0 ignored issues
–
show
|
|||
156 | |||
157 | def get_bool(self, key: str) -> int: |
||
0 ignored issues
–
show
|
|||
158 | # noinspection PyTypeChecker |
||
159 | return bool(self.__as(key, bool)) |
||
0 ignored issues
–
show
|
|||
160 | |||
161 | def get_str_list(self, key: str) -> List[str]: |
||
0 ignored issues
–
show
|
|||
162 | return self.__as_list(key, str) |
||
0 ignored issues
–
show
|
|||
163 | |||
164 | def get_int_list(self, key: str) -> List[int]: |
||
0 ignored issues
–
show
|
|||
165 | return self.__as_list(key, int) |
||
0 ignored issues
–
show
|
|||
166 | |||
167 | def get_float_list(self, key: str) -> List[int]: |
||
0 ignored issues
–
show
|
|||
168 | return self.__as_list(key, int) |
||
0 ignored issues
–
show
|
|||
169 | |||
170 | def get_bool_list(self, key: str) -> List[int]: |
||
0 ignored issues
–
show
|
|||
171 | return self.__as_list(key, bool) |
||
0 ignored issues
–
show
|
|||
172 | |||
173 | def get_float(self, key: str) -> int: |
||
0 ignored issues
–
show
|
|||
174 | # noinspection PyTypeChecker |
||
175 | return int(self.__as(key, float)) |
||
0 ignored issues
–
show
|
|||
176 | |||
177 | def __as_list(self, key: str, clazz): |
||
0 ignored issues
–
show
|
|||
178 | def to(v): |
||
0 ignored issues
–
show
Function name "to" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() Argument name "v" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
179 | if not isinstance(v, clazz): |
||
0 ignored issues
–
show
|
|||
180 | raise TypeError("{}={} is a {}, not {}".format(key, v, type(v), clazz)) |
||
0 ignored issues
–
show
|
|||
181 | return [to(v) for v in self[key]] |
||
0 ignored issues
–
show
|
|||
182 | |||
183 | def __as(self, key: str, clazz): |
||
0 ignored issues
–
show
|
|||
184 | v = self[key] |
||
0 ignored issues
–
show
Variable name "v" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
185 | if isinstance(v, clazz): |
||
0 ignored issues
–
show
|
|||
186 | return v |
||
0 ignored issues
–
show
|
|||
187 | else: |
||
0 ignored issues
–
show
|
|||
188 | raise TypeError("{}={} is a {}, not {}".format(key, v, type(v), clazz)) |
||
0 ignored issues
–
show
|
|||
189 | |||
190 | def sub(self, key: str): |
||
0 ignored issues
–
show
|
|||
191 | """Returns a new TomlData with its top set to items[1][2]...""" |
||
0 ignored issues
–
show
|
|||
192 | items = key.split('.') |
||
0 ignored issues
–
show
|
|||
193 | item = self.top |
||
0 ignored issues
–
show
|
|||
194 | for i, s in enumerate(items): |
||
0 ignored issues
–
show
Variable name "s" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
195 | if s not in item: raise MissingConfigEntry( |
||
0 ignored issues
–
show
|
|||
196 | "{} is not in the TOML; failed at {}" |
||
0 ignored issues
–
show
|
|||
197 | .format(key, '.'.join(items[:i+1])) |
||
0 ignored issues
–
show
|
|||
198 | ) |
||
199 | item = item[s] |
||
0 ignored issues
–
show
|
|||
200 | return TomlData(item) |
||
0 ignored issues
–
show
|
|||
201 | |||
202 | def items(self) -> ItemsView[str, object]: |
||
0 ignored issues
–
show
|
|||
203 | return self.top.items() |
||
0 ignored issues
–
show
|
|||
204 | |||
205 | def keys(self) -> KeysView[str]: |
||
0 ignored issues
–
show
|
|||
206 | return self.top.keys() |
||
0 ignored issues
–
show
|
|||
207 | |||
208 | def values(self) -> ValuesView[object]: |
||
0 ignored issues
–
show
|
|||
209 | return self.top.values() |
||
0 ignored issues
–
show
|
|||
210 | |||
211 | def nested_keys(self, separator='.') -> Iterator[str]: |
||
0 ignored issues
–
show
|
|||
212 | for lst in self.nested_key_lists(self.top): |
||
0 ignored issues
–
show
|
|||
213 | yield separator.join(lst) |
||
0 ignored issues
–
show
|
|||
214 | |||
215 | def nested_key_lists(self, dictionary: Dict[str, object], prefix=None) -> Iterator[List[str]]: |
||
0 ignored issues
–
show
|
|||
216 | |||
217 | prefix = prefix[:] if prefix else [] |
||
0 ignored issues
–
show
|
|||
218 | |||
219 | if isinstance(dictionary, dict): |
||
0 ignored issues
–
show
|
|||
220 | for key, value in dictionary.items(): |
||
0 ignored issues
–
show
|
|||
221 | |||
222 | if isinstance(value, dict): |
||
0 ignored issues
–
show
|
|||
223 | for result in self.nested_key_lists(value, [key] + prefix): yield result |
||
0 ignored issues
–
show
|
|||
224 | else: yield prefix + [key] |
||
0 ignored issues
–
show
|
|||
225 | |||
226 | else: yield dictionary |
||
0 ignored issues
–
show
|
|||
227 | |||
228 | |||
229 | def git_commit_hash(git_repo_dir: str='.') -> str: |
||
0 ignored issues
–
show
|
|||
230 | """Gets the hex of the most recent Git commit hash in git_repo_dir.""" |
||
0 ignored issues
–
show
|
|||
231 | p = Popen(['git', 'rev-parse', 'HEAD'], stdout=PIPE, cwd=git_repo_dir) |
||
0 ignored issues
–
show
Variable name "p" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
232 | (out, err) = p.communicate() |
||
0 ignored issues
–
show
|
|||
233 | exit_code = p.wait() |
||
0 ignored issues
–
show
|
|||
234 | if exit_code != 0: raise ValueError("Got nonzero exit code {} from git rev-parse".format(exit_code)) |
||
0 ignored issues
–
show
|
|||
235 | return out.decode('utf-8').rstrip() |
||
0 ignored issues
–
show
|
|||
236 | |||
237 | @deprecated(reason="Use klgists.common.flexible_logger instead.") |
||
238 | def init_logger( |
||
0 ignored issues
–
show
|
|||
239 | log_path: Optional[str]=None, |
||
0 ignored issues
–
show
|
|||
240 | format_str: str='%(asctime)s %(levelname)-8s: %(message)s', |
||
0 ignored issues
–
show
|
|||
241 | to_std: bool=True, |
||
0 ignored issues
–
show
|
|||
242 | child_logger_name: Optional[str]=None, |
||
0 ignored issues
–
show
|
|||
243 | std_level = logging.INFO, |
||
0 ignored issues
–
show
|
|||
244 | file_level = logging.DEBUG |
||
0 ignored issues
–
show
|
|||
245 | ): |
||
246 | """Initializes a logger that can write to a log file and/or stdout.""" |
||
0 ignored issues
–
show
|
|||
247 | |||
248 | if log_path is not None and not os.path.exists(os.path.dirname(log_path)): |
||
0 ignored issues
–
show
|
|||
249 | os.mkdir(os.path.dirname(log_path)) |
||
0 ignored issues
–
show
|
|||
250 | |||
251 | if child_logger_name is None: |
||
0 ignored issues
–
show
|
|||
252 | logger = logging.getLogger() |
||
0 ignored issues
–
show
logger is re-defining a name which is already available in the outer-scope (previously defined on line 297 ).
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: param = 5
class Foo:
def __init__(self, param): # "param" would be flagged here
self.param = param
![]() |
|||
253 | else: |
||
0 ignored issues
–
show
|
|||
254 | logger = logging.getLogger(child_logger_name) |
||
0 ignored issues
–
show
|
|||
255 | logger.setLevel(logging.NOTSET) |
||
0 ignored issues
–
show
|
|||
256 | |||
257 | formatter = logging.Formatter(format_str) |
||
0 ignored issues
–
show
|
|||
258 | |||
259 | if log_path is not None: |
||
0 ignored issues
–
show
|
|||
260 | handler = logging.FileHandler(log_path, encoding='utf-8') |
||
0 ignored issues
–
show
|
|||
261 | handler.setLevel(file_level) |
||
0 ignored issues
–
show
|
|||
262 | handler.setFormatter(formatter) |
||
0 ignored issues
–
show
|
|||
263 | logger.addHandler(handler) |
||
0 ignored issues
–
show
|
|||
264 | |||
265 | if to_std: |
||
0 ignored issues
–
show
|
|||
266 | handler = logging.StreamHandler() |
||
0 ignored issues
–
show
|
|||
267 | handler.setLevel(std_level) |
||
0 ignored issues
–
show
|
|||
268 | handler.setFormatter(formatter) |
||
0 ignored issues
–
show
|
|||
269 | logger.addHandler(handler) |
||
0 ignored issues
–
show
|
|||
270 | |||
271 | import datetime |
||
0 ignored issues
–
show
|
|||
272 | def format_time(time: datetime) -> str: |
||
273 | """Standard timestamp format. Ex: 2016-05-02_22_35_56.""" |
||
0 ignored issues
–
show
|
|||
274 | return time.strftime("%Y-%m-%d_%H-%M-%S") |
||
0 ignored issues
–
show
|
|||
275 | |||
276 | def timestamp() -> str: |
||
277 | """Standard timestamp of time now. Ex: 2016-05-02_22_35_56.""" |
||
0 ignored issues
–
show
|
|||
278 | return format_time(datetime.datetime.now()) |
||
0 ignored issues
–
show
|
|||
279 | |||
280 | def timestamp_path(path: str) -> str: |
||
281 | """Standard way to label a file path with a timestamp.""" |
||
0 ignored issues
–
show
|
|||
282 | return "{}-{}".format(path, timestamp()) |
||
0 ignored issues
–
show
|
|||
283 | |||
284 | |||
285 | def nice_time(n_ms: int) -> str: |
||
0 ignored issues
–
show
|
|||
286 | length = datetime.datetime(1, 1, 1) + datetime.timedelta(milliseconds=n_ms) |
||
0 ignored issues
–
show
|
|||
287 | if n_ms < 1000 * 60 * 60 * 24: |
||
0 ignored issues
–
show
|
|||
288 | return "{}h, {}m, {}s".format(length.hour, length.minute, length.second) |
||
0 ignored issues
–
show
|
|||
289 | else: |
||
0 ignored issues
–
show
|
|||
290 | return "{}d, {}h, {}m, {}s".format(length.day, length.hour, length.minute, length.second) |
||
0 ignored issues
–
show
|
|||
291 | |||
292 | |||
293 | def parse_local_iso_datetime(z: str) -> datetime: |
||
0 ignored issues
–
show
Argument name "z" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
294 | return datetime.datetime.strptime(z, '%Y-%m-%dT%H:%M:%S.%f') |
||
0 ignored issues
–
show
|
|||
295 | |||
296 | |||
297 | logger = logging.getLogger(__name__) |
||
298 | |||
299 | class PipeType(Enum): |
||
0 ignored issues
–
show
|
|||
300 | STDOUT = 1 |
||
0 ignored issues
–
show
|
|||
301 | STDERR = 2 |
||
0 ignored issues
–
show
|
|||
302 | |||
303 | def _disp(out, ell, name): |
||
304 | out = out.strip() |
||
0 ignored issues
–
show
|
|||
305 | if '\n' in out: |
||
0 ignored issues
–
show
|
|||
306 | ell(name + ":\n<<=====\n" + out + '\n=====>>') |
||
0 ignored issues
–
show
|
|||
307 | elif len(out) > 0: |
||
0 ignored issues
–
show
|
|||
308 | ell(name + ": <<===== " + out + " =====>>") |
||
0 ignored issues
–
show
|
|||
309 | else: |
||
0 ignored issues
–
show
|
|||
310 | ell(name + ": <no output>") |
||
0 ignored issues
–
show
|
|||
311 | |||
0 ignored issues
–
show
|
|||
312 | |||
313 | def _log(out, err, ell): |
||
314 | _disp(out, ell, "stdout") |
||
0 ignored issues
–
show
|
|||
315 | _disp(err, ell, "stderr") |
||
0 ignored issues
–
show
|
|||
316 | |||
317 | |||
0 ignored issues
–
show
|
|||
318 | def smart_log_callback(source, line, prefix: str = '') -> None: |
||
0 ignored issues
–
show
|
|||
319 | line = line.decode('utf-8') |
||
0 ignored issues
–
show
|
|||
320 | View Code Duplication | if line.startswith('FATAL:'): |
|
0 ignored issues
–
show
|
|||
321 | logger.fatal(prefix + line) |
||
0 ignored issues
–
show
|
|||
322 | elif line.startswith('ERROR:'): |
||
0 ignored issues
–
show
|
|||
323 | logger.error(prefix + line) |
||
0 ignored issues
–
show
|
|||
324 | elif line.startswith('WARNING:'): |
||
0 ignored issues
–
show
|
|||
325 | logger.warning(prefix + line) |
||
0 ignored issues
–
show
|
|||
326 | elif line.startswith('INFO:'): |
||
0 ignored issues
–
show
|
|||
327 | logger.info(prefix + line) |
||
0 ignored issues
–
show
|
|||
328 | elif line.startswith('DEBUG:'): |
||
0 ignored issues
–
show
|
|||
329 | logger.debug(prefix + line) |
||
0 ignored issues
–
show
|
|||
330 | else: |
||
0 ignored issues
–
show
|
|||
331 | logger.debug(prefix + line) |
||
0 ignored issues
–
show
|
|||
332 | |||
333 | |||
334 | def _reader(pipe_type, pipe, queue): |
||
335 | try: |
||
0 ignored issues
–
show
|
|||
336 | with pipe: |
||
0 ignored issues
–
show
|
|||
337 | for line in iter(pipe.readline, b''): |
||
0 ignored issues
–
show
|
|||
338 | queue.put((pipe_type, line)) |
||
0 ignored issues
–
show
|
|||
339 | finally: |
||
0 ignored issues
–
show
|
|||
340 | queue.put(None) |
||
0 ignored issues
–
show
|
|||
341 | |||
0 ignored issues
–
show
|
|||
342 | def stream_cmd_call(cmd: List[str], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell_cmd: str=None, cwd: Optional[str] = None, timeout_secs: Optional[float] = None, log_callback: Callable[[PipeType, bytes], None] = None, bufsize: Optional[int] = None) -> None: |
||
0 ignored issues
–
show
|
|||
343 | """Calls an external command, waits, and throws a ResourceError for nonzero exit codes. |
||
0 ignored issues
–
show
|
|||
344 | Returns (stdout, stderr). |
||
345 | The user can optionally provide a shell to run the command with, e.g. "powershell.exe" |
||
0 ignored issues
–
show
|
|||
346 | """ |
||
347 | if log_callback is None: |
||
0 ignored issues
–
show
|
|||
348 | log_callback = smart_log_callback |
||
0 ignored issues
–
show
|
|||
349 | cmd = [str(p) for p in cmd] |
||
0 ignored issues
–
show
|
|||
350 | if shell_cmd: |
||
0 ignored issues
–
show
|
|||
351 | cmd = [shell_cmd] + cmd |
||
0 ignored issues
–
show
|
|||
352 | logger.debug("Streaming '{}'".format(' '.join(cmd))) |
||
0 ignored issues
–
show
|
|||
353 | |||
0 ignored issues
–
show
|
|||
354 | p = subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, cwd=cwd, bufsize=bufsize) |
||
0 ignored issues
–
show
Variable name "p" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
355 | try: |
||
0 ignored issues
–
show
|
|||
356 | q = Queue() |
||
0 ignored issues
–
show
Variable name "q" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
357 | Thread(target=_reader, args=[PipeType.STDOUT, p.stdout, q]).start() |
||
0 ignored issues
–
show
|
|||
358 | Thread(target=_reader, args=[PipeType.STDERR, p.stderr, q]).start() |
||
0 ignored issues
–
show
|
|||
359 | for _ in range(2): |
||
0 ignored issues
–
show
|
|||
360 | for source, line in iter(q.get, None): |
||
0 ignored issues
–
show
|
|||
361 | log_callback(source, line) |
||
0 ignored issues
–
show
|
|||
362 | exit_code = p.wait(timeout=timeout_secs) |
||
0 ignored issues
–
show
|
|||
363 | finally: |
||
0 ignored issues
–
show
|
|||
364 | p.kill() |
||
0 ignored issues
–
show
|
|||
365 | if exit_code != 0: |
||
0 ignored issues
–
show
|
|||
366 | raise ResourceError("Got nonzero exit code {} from '{}'".format(exit_code, ' '.join(cmd)), cmd, exit_code, '<<unknown>>', '<<unknown>>') |
||
0 ignored issues
–
show
|
|||
367 | |||
0 ignored issues
–
show
|
|||
368 | |||
369 | def wrap_cmd_call(cmd: List[str], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell_cmd: str=None, cwd: Optional[str] = None, timeout_secs: Optional[float] = None) -> (str, str): |
||
0 ignored issues
–
show
|
|||
370 | """Calls an external command, waits, and throws a ResourceError for nonzero exit codes. |
||
0 ignored issues
–
show
|
|||
371 | Returns (stdout, stderr). |
||
372 | The user can optionally provide a shell to run the command with, e.g. "powershell.exe" |
||
0 ignored issues
–
show
|
|||
373 | """ |
||
374 | cmd = [str(p) for p in cmd] |
||
0 ignored issues
–
show
|
|||
375 | if shell_cmd: |
||
0 ignored issues
–
show
|
|||
376 | cmd = [shell_cmd] + cmd |
||
0 ignored issues
–
show
|
|||
377 | logger.debug("Calling '{}'".format(' '.join(cmd))) |
||
0 ignored issues
–
show
|
|||
378 | p = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, cwd=cwd) |
||
0 ignored issues
–
show
Variable name "p" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
379 | out, err, exit_code = None, None, None |
||
0 ignored issues
–
show
|
|||
380 | try: |
||
0 ignored issues
–
show
|
|||
381 | (out, err) = p.communicate(timeout=timeout_secs) |
||
0 ignored issues
–
show
|
|||
382 | out = out.decode('utf-8') |
||
0 ignored issues
–
show
|
|||
383 | err = err.decode('utf-8') |
||
0 ignored issues
–
show
|
|||
384 | exit_code = p.wait(timeout=timeout_secs) |
||
0 ignored issues
–
show
|
|||
385 | except Exception as e: |
||
0 ignored issues
–
show
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
386 | _log(out, err, logger.warning) |
||
0 ignored issues
–
show
|
|||
387 | raise e |
||
0 ignored issues
–
show
|
|||
388 | finally: |
||
0 ignored issues
–
show
|
|||
389 | p.kill() |
||
0 ignored issues
–
show
|
|||
390 | if exit_code != 0: |
||
0 ignored issues
–
show
|
|||
391 | _log(out, err, logger.warning) |
||
0 ignored issues
–
show
|
|||
392 | raise ResourceError("Got nonzero exit code {} from '{}'".format(exit_code, ' '.join(cmd)), cmd, exit_code, out, err) |
||
0 ignored issues
–
show
|
|||
393 | _log(out, err, logger.debug) |
||
0 ignored issues
–
show
|
|||
394 | return out, err |
||
0 ignored issues
–
show
|
|||
395 | |||
396 | def look(obj: object, attrs: str) -> any: |
||
0 ignored issues
–
show
|
|||
397 | if not isinstance(attrs, str) and isinstance(attrs, Iterable): attrs = '.'.join(attrs) |
||
0 ignored issues
–
show
|
|||
398 | try: |
||
0 ignored issues
–
show
|
|||
399 | return operator.attrgetter(attrs)(obj) |
||
0 ignored issues
–
show
|
|||
400 | except AttributeError: return None |
||
0 ignored issues
–
show
|
|||
401 | |||
402 | def flatmap(func, *iterable): |
||
0 ignored issues
–
show
|
|||
403 | return chain.from_iterable(map(func, *iterable)) |
||
0 ignored issues
–
show
|
|||
404 | |||
405 | def flatten(*iterable): |
||
0 ignored issues
–
show
|
|||
406 | return list(chain.from_iterable(iterable)) |
||
0 ignored issues
–
show
|
|||
407 | |||
408 | class DevNull: |
||
0 ignored issues
–
show
|
|||
409 | def write(self, msg): pass |
||
0 ignored issues
–
show
|
|||
410 | |||
411 | pjoin = os.path.join |
||
412 | pexists = os.path.exists |
||
413 | pdir = os.path.isdir |
||
414 | pfile = os.path.isfile |
||
415 | pis_dir = os.path.isdir |
||
416 | fsize = os.path.getsize |
||
417 | def pardir(path: str, depth: int=1): |
||
0 ignored issues
–
show
|
|||
418 | for _ in range(-1, depth): |
||
0 ignored issues
–
show
|
|||
419 | path = os.path.dirname(path) |
||
0 ignored issues
–
show
|
|||
420 | return path |
||
0 ignored issues
–
show
|
|||
421 | def grandpardir(path: str): |
||
0 ignored issues
–
show
|
|||
422 | return pardir(path, 2) |
||
0 ignored issues
–
show
|
|||
423 | |||
424 | |||
425 | T = TypeVar('T') |
||
0 ignored issues
–
show
Class name "T" doesn't conform to PascalCase naming style ('[^\\W\\da-z][^\\W_]+$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
426 | def try_index_of(element: List[T], list_element: T) -> Optional[T]: |
||
0 ignored issues
–
show
|
|||
427 | try: |
||
0 ignored issues
–
show
|
|||
428 | index_element = list_element.index(element) |
||
0 ignored issues
–
show
|
|||
429 | return index_element |
||
0 ignored issues
–
show
|
|||
430 | except ValueError: |
||
0 ignored issues
–
show
|
|||
431 | return None |
||
0 ignored issues
–
show
|
|||
432 | |||
433 | def decorator(cls): |
||
0 ignored issues
–
show
|
|||
434 | return cls |
||
0 ignored issues
–
show
|
|||
435 | |||
436 | |||
437 | def exists(keep_predicate: Callable[[T], bool], seq: Iterable[T]) -> bool: |
||
438 | """Efficient existential quantifier for a filter() predicate. |
||
0 ignored issues
–
show
|
|||
439 | Returns true iff keep_predicate is true for one or more elements.""" |
||
440 | for e in seq: |
||
0 ignored issues
–
show
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
441 | if keep_predicate(e): return True # short-circuit |
||
0 ignored issues
–
show
|
|||
442 | return False |
||
0 ignored issues
–
show
|
|||
443 | |||
444 | |||
445 | def zip_strict(*args): |
||
446 | """Same as zip(), but raises an IndexError if the lengths don't match.""" |
||
0 ignored issues
–
show
|
|||
447 | iters = [iter(axis) for axis in args] |
||
0 ignored issues
–
show
|
|||
448 | n_elements = 0 |
||
0 ignored issues
–
show
|
|||
449 | failures = [] |
||
0 ignored issues
–
show
|
|||
450 | while len(failures) == 0: |
||
0 ignored issues
–
show
|
|||
451 | n_elements += 1 |
||
0 ignored issues
–
show
|
|||
452 | values = [] |
||
0 ignored issues
–
show
|
|||
453 | failures = [] |
||
0 ignored issues
–
show
|
|||
454 | for axis, iterator in enumerate(iters): |
||
0 ignored issues
–
show
|
|||
455 | try: |
||
0 ignored issues
–
show
|
|||
456 | values.append(next(iterator)) |
||
0 ignored issues
–
show
|
|||
457 | except StopIteration: |
||
0 ignored issues
–
show
|
|||
458 | failures.append(axis) |
||
0 ignored issues
–
show
|
|||
459 | if len(failures) == 0: |
||
0 ignored issues
–
show
|
|||
460 | yield tuple(values) |
||
0 ignored issues
–
show
|
|||
461 | if len(failures) == 1: |
||
0 ignored issues
–
show
|
|||
462 | raise IndexError("Too few elements ({}) along axis {}".format(n_elements, failures[0])) |
||
0 ignored issues
–
show
|
|||
463 | elif len(failures) < len(iters): |
||
0 ignored issues
–
show
|
|||
464 | raise IndexError("Too few elements ({}) along axes {}".format(n_elements, failures)) |
||
0 ignored issues
–
show
|
|||
465 | |||
466 | |||
467 | def only(sequence: Iterable[Any]) -> Any: |
||
468 | """ |
||
0 ignored issues
–
show
|
|||
469 | Returns either the SINGLE (ONLY) UNIQUE ITEM in the sequence or raises an exception. |
||
470 | Each item must have __hash__ defined on it. |
||
471 | :param sequence: A list of any items (untyped) |
||
472 | :return: The first item the sequence. |
||
473 | :raises: ValarLookupError If the sequence is empty |
||
474 | :raises: MultipleMatchesError If there is more than one unique item. |
||
475 | """ |
||
476 | st = set(sequence) |
||
0 ignored issues
–
show
Variable name "st" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
477 | if len(st) > 1: |
||
0 ignored issues
–
show
|
|||
478 | raise MultipleMatchesError("More then 1 item in {}".format(sequence)) |
||
0 ignored issues
–
show
|
|||
479 | if len(st) == 0: |
||
0 ignored issues
–
show
|
|||
480 | raise LookupFailedError("Empty sequence") |
||
0 ignored issues
–
show
|
|||
481 | return next(iter(st)) |
||
0 ignored issues
–
show
|
|||
482 | |||
483 | |||
484 | def read_lines_file(path: str, ignore_comments: bool = False) -> Sequence[str]: |
||
485 | """ |
||
0 ignored issues
–
show
|
|||
486 | Returns a list of lines in a file, potentially ignoring comments. |
||
487 | :param path: Read the file at this local path |
||
488 | :param ignore_comments: Ignore lines beginning with #, excluding whitespace |
||
489 | :return: The lines, with surrounding whitespace stripped |
||
490 | """ |
||
491 | lines = [] |
||
0 ignored issues
–
show
lines is re-defining a name which is already available in the outer-scope (previously defined on line 657 ).
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: param = 5
class Foo:
def __init__(self, param): # "param" would be flagged here
self.param = param
![]() |
|||
492 | with open(path) as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
493 | line = f.readline().strip() |
||
0 ignored issues
–
show
|
|||
494 | if not ignore_comments or not line.startswith('#'): |
||
0 ignored issues
–
show
|
|||
495 | lines.append(line) |
||
0 ignored issues
–
show
|
|||
496 | return lines |
||
0 ignored issues
–
show
|
|||
497 | |||
498 | |||
499 | def read_properties_file(path: str) -> Mapping[str, str]: |
||
500 | """ |
||
0 ignored issues
–
show
|
|||
501 | Reads a .properties file, which is a list of lines with key=value pairs (with an equals sign). |
||
502 | Lines beginning with # are ignored. |
||
503 | Each line must contain exactly 1 equals sign. |
||
504 | :param path: Read the file at this local path |
||
505 | :return: A dict mapping keys to values, both with surrounding whitespace stripped |
||
506 | """ |
||
507 | lines = read_lines_file(path, ignore_comments=False) |
||
0 ignored issues
–
show
lines is re-defining a name which is already available in the outer-scope (previously defined on line 657 ).
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: param = 5
class Foo:
def __init__(self, param): # "param" would be flagged here
self.param = param
![]() |
|||
508 | dct = {} |
||
0 ignored issues
–
show
|
|||
509 | for i, line in enumerate(lines): |
||
0 ignored issues
–
show
|
|||
510 | if line.startswith('#'): continue |
||
0 ignored issues
–
show
|
|||
511 | if line.count('=') != 1: |
||
0 ignored issues
–
show
|
|||
512 | raise ParsingError("Bad line {} in {}".format(i+1, path)) |
||
0 ignored issues
–
show
|
|||
513 | parts = line.split('=') |
||
0 ignored issues
–
show
|
|||
514 | dct[parts[0].strip()] = parts[1].strip() |
||
0 ignored issues
–
show
|
|||
515 | return dct |
||
0 ignored issues
–
show
|
|||
516 | |||
517 | |||
518 | class Comparable: |
||
519 | """A class that's comparable. Just implement __lt__. Credit ot Alex Martelli on https://stackoverflow.com/questions/1061283/lt-instead-of-cmp""" |
||
0 ignored issues
–
show
|
|||
520 | |||
521 | def __eq__(self, other): |
||
0 ignored issues
–
show
|
|||
522 | return not self < other and not other < self |
||
0 ignored issues
–
show
|
|||
523 | |||
524 | def __ne__(self, other): |
||
0 ignored issues
–
show
|
|||
525 | return self < other or other < self |
||
0 ignored issues
–
show
|
|||
526 | |||
527 | def __gt__(self, other): |
||
0 ignored issues
–
show
|
|||
528 | return other < self |
||
0 ignored issues
–
show
|
|||
529 | |||
530 | def __ge__(self, other): |
||
0 ignored issues
–
show
|
|||
531 | return not self < other |
||
0 ignored issues
–
show
|
|||
532 | |||
533 | def __le__(self, other): |
||
0 ignored issues
–
show
|
|||
534 | return not other < self |
||
0 ignored issues
–
show
|
|||
535 | |||
536 | |||
537 | def json_serial(obj): |
||
538 | """JSON serializer for objects not serializable by default json code. |
||
0 ignored issues
–
show
|
|||
539 | From jgbarah at https://stackoverflow.com/questions/11875770/how-to-overcome-datetime-datetime-not-json-serializable |
||
0 ignored issues
–
show
|
|||
540 | """ |
||
541 | if isinstance(obj, (datetime, date)): |
||
0 ignored issues
–
show
|
|||
542 | return obj.isoformat() |
||
0 ignored issues
–
show
|
|||
543 | try: |
||
0 ignored issues
–
show
|
|||
544 | import peewee |
||
0 ignored issues
–
show
|
|||
545 | if isinstance(obj, peewee.Field): |
||
0 ignored issues
–
show
|
|||
546 | return type(obj).__name__ |
||
0 ignored issues
–
show
|
|||
547 | except ImportError: pass |
||
0 ignored issues
–
show
|
|||
548 | raise TypeError("Type %s not serializable" % type(obj)) |
||
0 ignored issues
–
show
|
|||
549 | |||
550 | def pretty_dict(dct: dict) -> str: |
||
551 | """Returns a pretty-printed dict, complete with indentation. Will fail on non-JSON-serializable datatypes.""" |
||
0 ignored issues
–
show
|
|||
552 | return json.dumps(dct, default=json_serial, sort_keys=True, indent=4) |
||
0 ignored issues
–
show
|
|||
553 | |||
554 | def pp_dict(dct: dict) -> None: |
||
555 | """Pretty-prints a dict to stdout.""" |
||
0 ignored issues
–
show
|
|||
556 | print(pretty_dict(dct)) |
||
0 ignored issues
–
show
|
|||
557 | |||
558 | def pp_size(obj: object) -> None: |
||
559 | """Prints to stdout a human-readable string of the memory usage of arbitrary Python objects. Ex: 8M for 8 megabytes.""" |
||
0 ignored issues
–
show
|
|||
560 | print(hsize(sys.getsizeof(obj))) |
||
0 ignored issues
–
show
|
|||
561 | |||
562 | def sanitize_str(value: str) -> str: |
||
563 | """Removes Unicode control (Cc) characters EXCEPT for tabs (\t), newlines (\n only), line separators (U+2028) and paragraph separators (U+2029).""" |
||
0 ignored issues
–
show
|
|||
564 | return "".join(ch for ch in value if unicodedata.category(ch) != 'Cc' and ch not in {'\t', '\n', '\u2028', '\u2029'}) |
||
0 ignored issues
–
show
|
|||
565 | |||
566 | def escape_for_properties(value: Any) -> str: |
||
0 ignored issues
–
show
|
|||
567 | return sanitize_str(str(value).replace('\n', '\u2028')) |
||
0 ignored issues
–
show
|
|||
568 | |||
569 | def escape_for_tsv(value: Any) -> str: |
||
0 ignored issues
–
show
|
|||
570 | return sanitize_str(str(value).replace('\n', '\u2028').replace('\t', ' ')) |
||
0 ignored issues
–
show
|
|||
571 | |||
0 ignored issues
–
show
|
|||
572 | class Timeout: |
||
0 ignored issues
–
show
|
|||
573 | def __init__(self, seconds: int = 10, error_message='Timeout'): |
||
0 ignored issues
–
show
|
|||
574 | self.seconds = seconds |
||
0 ignored issues
–
show
|
|||
575 | self.error_message = error_message |
||
0 ignored issues
–
show
|
|||
576 | def handle_timeout(self, signum, frame): |
||
0 ignored issues
–
show
|
|||
577 | raise TimeoutError(self.error_message) |
||
0 ignored issues
–
show
|
|||
578 | def __enter__(self): |
||
0 ignored issues
–
show
|
|||
579 | signal.signal(signal.SIGALRM, self.handle_timeout) |
||
0 ignored issues
–
show
|
|||
580 | signal.alarm(self.seconds) |
||
0 ignored issues
–
show
|
|||
581 | def __exit__(self, type, value, traceback): |
||
0 ignored issues
–
show
|
|||
582 | signal.alarm(0) |
||
0 ignored issues
–
show
|
|||
583 | |||
584 | |||
585 | class OverwriteChoice(Enum): |
||
0 ignored issues
–
show
|
|||
586 | FAIL = 1 |
||
0 ignored issues
–
show
|
|||
587 | WARN = 2 |
||
0 ignored issues
–
show
|
|||
588 | IGNORE = 3 |
||
0 ignored issues
–
show
|
|||
589 | OVERWRITE = 4 |
||
0 ignored issues
–
show
|
|||
590 | |||
591 | def fix_path(path: str) -> str: |
||
0 ignored issues
–
show
|
|||
592 | # ffmpeg won't recognize './' and will simply not write images! |
||
593 | # and Python doesn't recognize ~ |
||
594 | if '%' in path: raise ValueError( |
||
0 ignored issues
–
show
|
|||
595 | 'For technical limitations (regarding ffmpeg), local paths cannot contain a percent sign (%), but "{}" does'.format(path) |
||
0 ignored issues
–
show
|
|||
596 | ) |
||
597 | if path == '~': return os.environ['HOME'] # prevent out of bounds |
||
0 ignored issues
–
show
|
|||
598 | if path.startswith('~'): |
||
0 ignored issues
–
show
|
|||
599 | path = pjoin(os.environ['HOME'], path[2:]) |
||
0 ignored issues
–
show
|
|||
600 | return path.replace('./', '') |
||
0 ignored issues
–
show
|
|||
601 | |||
602 | def fix_path_platform_dependent(path: str) -> str: |
||
603 | """Modifies path strings to work with Python and external tools. |
||
0 ignored issues
–
show
A suspicious escape sequence
\ was found. Did you maybe forget to add an r prefix?
Escape sequences in Python are generally interpreted according to rules similar
to standard C. Only if strings are prefixed with The escape sequence that was used indicates that you might have intended to write a regular expression. Learn more about the available escape sequences. in the Python documentation. ![]() |
|||
604 | Replaces a beginning '~' with the HOME environment variable. |
||
605 | Also accepts either / or \ (but not both) as a path separator in windows. |
||
0 ignored issues
–
show
|
|||
606 | """ |
||
607 | path = fix_path(path) |
||
0 ignored issues
–
show
|
|||
608 | # if windows, allow either / or \, but not both |
||
609 | if platform.system() == 'Windows': |
||
0 ignored issues
–
show
|
|||
610 | bits = re.split('[/\\\\]', path) |
||
0 ignored issues
–
show
|
|||
611 | return pjoin(*bits).replace(":", ":\\") |
||
0 ignored issues
–
show
|
|||
612 | else: |
||
0 ignored issues
–
show
|
|||
613 | return path |
||
0 ignored issues
–
show
|
|||
614 | |||
615 | |||
616 | # NTFS doesn't allow these, so let's be safe |
||
617 | # Also exclude control characters |
||
618 | # 127 is the DEL char |
||
619 | _bad_chars = {'/', ':', '<', '>', '"', "'", '\\', '|', '?', '*', chr(127), *{chr(i) for i in range(0, 32)}} |
||
0 ignored issues
–
show
|
|||
620 | assert ' ' not in _bad_chars |
||
621 | def _sanitize_bit(p: str) -> str: |
||
0 ignored issues
–
show
Argument name "p" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
622 | for b in _bad_chars: p = p.replace(b, '-') |
||
0 ignored issues
–
show
Variable name "b" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
623 | return p |
||
0 ignored issues
–
show
|
|||
624 | def pjoin_sanitized_rel(*pieces: Iterable[any]) -> str: |
||
625 | """Builds a path from a hierarchy, sanitizing the path by replacing /, :, <, >, ", ', \, |, ?, *, <DEL>, and control characters 0–32 with a hyphen-minus (-). |
||
0 ignored issues
–
show
A suspicious escape sequence
\, was found. Did you maybe forget to add an r prefix?
Escape sequences in Python are generally interpreted according to rules similar
to standard C. Only if strings are prefixed with The escape sequence that was used indicates that you might have intended to write a regular expression. Learn more about the available escape sequences. in the Python documentation. ![]() |
|||
626 | Each input to pjoin_sanitized must refer only to a single directory or file (cannot contain a path separator). |
||
0 ignored issues
–
show
|
|||
627 | This means that you cannot have an absolute path (it would begin with os.path (probably /); use pjoin_sanitized_abs for this. |
||
0 ignored issues
–
show
|
|||
628 | """ |
||
629 | return pjoin(*[_sanitize_bit(str(bit)) for bit in pieces]) |
||
0 ignored issues
–
show
|
|||
630 | def pjoin_sanitized_abs(*pieces: Iterable[any]) -> str: |
||
631 | """Same as pjoin_sanitized_rel but starts with os.sep (the root directory).""" |
||
0 ignored issues
–
show
|
|||
632 | return pjoin(os.sep, pjoin_sanitized_rel(*pieces)) |
||
0 ignored issues
–
show
|
|||
633 | |||
634 | |||
635 | def make_dirs(output_dir: str): |
||
636 | """Makes a directory if it doesn't exist. |
||
0 ignored issues
–
show
|
|||
637 | May raise a PathIsNotADirError. |
||
638 | """ |
||
639 | if not os.path.exists(output_dir): |
||
0 ignored issues
–
show
|
|||
640 | os.makedirs(output_dir) |
||
0 ignored issues
–
show
|
|||
641 | elif not os.path.isdir(output_dir): |
||
0 ignored issues
–
show
|
|||
642 | raise PathIsNotADirError("{} already exists and is not a directory".format(output_dir)) |
||
0 ignored issues
–
show
|
|||
643 | |||
644 | |||
645 | def remake_dirs(output_dir: str): |
||
646 | """Makes a directory, remaking it if it already exists. |
||
0 ignored issues
–
show
|
|||
647 | May raise a PathIsNotADirError. |
||
648 | """ |
||
649 | if os.path.exists(output_dir) and os.path.isdir(output_dir): |
||
0 ignored issues
–
show
|
|||
650 | shutil.rmtree(output_dir) |
||
0 ignored issues
–
show
|
|||
651 | elif os.path.exists(output_dir): |
||
0 ignored issues
–
show
|
|||
652 | raise PathIsNotADirError("{} already exists and is not a directory".format(output_dir)) |
||
0 ignored issues
–
show
|
|||
653 | make_dirs(output_dir) |
||
0 ignored issues
–
show
|
|||
654 | |||
655 | |||
656 | |||
657 | def lines(file_name: str, known_encoding='utf-8') -> Iterator[str]: |
||
658 | """Lazily read a text file or gzipped text file, decode, and strip any newline character (\n or \r). |
||
0 ignored issues
–
show
|
|||
659 | If the file name ends with '.gz' or '.gzip', assumes the file is Gzipped. |
||
660 | Arguments: |
||
661 | known_encoding: Applied only when decoding gzip |
||
662 | """ |
||
663 | if file_name.endswith('.gz') or file_name.endswith('.gzip'): |
||
0 ignored issues
–
show
|
|||
664 | with io.TextIOWrapper(gzip.open(file_name, 'r'), encoding=known_encoding) as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
665 | for line in f: yield line.rstrip('\n\r') |
||
0 ignored issues
–
show
|
|||
666 | else: |
||
0 ignored issues
–
show
|
|||
667 | with open(file_name, 'r') as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
668 | for line in f: yield line.rstrip('\n\r') |
||
0 ignored issues
–
show
|
|||
669 | |||
670 | import dill |
||
0 ignored issues
–
show
|
|||
671 | |||
672 | def pkl(data, path: str): |
||
0 ignored issues
–
show
|
|||
673 | with open(path, 'wb') as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
674 | dill.dump(data, f) |
||
0 ignored issues
–
show
|
|||
675 | |||
676 | def unpkl(path: str): |
||
0 ignored issues
–
show
|
|||
677 | with open(path, 'rb') as f: |
||
0 ignored issues
–
show
Variable name "f" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
678 | return dill.load(f) |
||
0 ignored issues
–
show
|
|||
679 | |||
680 | |||
681 | def file_from_env_var(var: str) -> str: |
||
682 | """ |
||
0 ignored issues
–
show
|
|||
683 | Just returns the path of a file specified in an environment variable, checking that it's a file. |
||
684 | Will raise a MissingResourceError error if not set or not a file. |
||
685 | :param var: The environment variable name, not including the $ |
||
686 | """ |
||
687 | if var not in os.environ: |
||
0 ignored issues
–
show
|
|||
688 | raise MissingResourceError('Environment variable ${} is not set'.format(var)) |
||
0 ignored issues
–
show
|
|||
689 | config_file_path = fix_path(os.environ[var]) |
||
0 ignored issues
–
show
|
|||
690 | if not pexists(config_file_path): |
||
0 ignored issues
–
show
|
|||
691 | raise MissingResourceError("{} file {} does not exist".format(var, config_file_path)) |
||
0 ignored issues
–
show
|
|||
692 | if not pfile(config_file_path): |
||
0 ignored issues
–
show
|
|||
693 | raise MissingResourceError("{} file {} is not a file".format(var, config_file_path)) |
||
0 ignored issues
–
show
|
|||
694 | return config_file_path |
||
0 ignored issues
–
show
|
|||
695 | |||
696 | |||
697 | def is_proper_file(path: str) -> bool: |
||
0 ignored issues
–
show
|
|||
698 | name = os.path.split(path)[1] |
||
0 ignored issues
–
show
|
|||
699 | return len(name) > 0 and name[0] not in {'.', '~', '_'} |
||
0 ignored issues
–
show
|
|||
700 | |||
701 | |||
702 | def scantree(path: str, follow_symlinks: bool=False) -> Iterator[str]: |
||
0 ignored issues
–
show
|
|||
703 | """List the full path of every file not beginning with '.', '~', or '_' in a directory, recursively. |
||
0 ignored issues
–
show
|
|||
704 | .. deprecated Use scan_for_proper_files, which has a better name |
||
705 | """ |
||
706 | for entry in os.scandir(path): |
||
0 ignored issues
–
show
|
|||
707 | if entry.is_dir(follow_symlinks=follow_symlinks): |
||
0 ignored issues
–
show
|
|||
708 | yield from scantree(entry.path) |
||
0 ignored issues
–
show
|
|||
709 | elif is_proper_file(entry.path): |
||
0 ignored issues
–
show
|
|||
710 | yield entry.path |
||
0 ignored issues
–
show
|
|||
711 | |||
712 | scan_for_proper_files = scantree |
||
713 | |||
714 | |||
715 | def scan_for_files(path: str, follow_symlinks: bool=False) -> Iterator[str]: |
||
0 ignored issues
–
show
|
|||
716 | """ |
||
0 ignored issues
–
show
|
|||
717 | Using a generator, list all files in a directory or one of its subdirectories. |
||
718 | Useful for iterating over files in a directory recursively if there are thousands of file. |
||
719 | Warning: If there are looping symlinks, follow_symlinks will return an infinite generator. |
||
720 | """ |
||
721 | for d in os.scandir(path): |
||
0 ignored issues
–
show
Variable name "d" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
722 | if d.is_dir(follow_symlinks=follow_symlinks): |
||
0 ignored issues
–
show
|
|||
723 | yield from scan_for_files(d.path) |
||
0 ignored issues
–
show
|
|||
724 | else: |
||
0 ignored issues
–
show
|
|||
725 | yield d.path |
||
0 ignored issues
–
show
|
|||
726 | |||
727 | |||
728 | def walk_until(some_dir, until: Callable[[str], bool]) -> Iterator[typing.Tuple[str, str, str]]: |
||
729 | """Walk but stop recursing after 'until' occurs. |
||
0 ignored issues
–
show
|
|||
730 | Returns files and directories in the same manner as os.walk |
||
731 | """ |
||
732 | some_dir = some_dir.rstrip(os.path.sep) |
||
0 ignored issues
–
show
|
|||
733 | assert os.path.isdir(some_dir) |
||
0 ignored issues
–
show
|
|||
734 | for root, dirs, files in os.walk(some_dir): |
||
0 ignored issues
–
show
|
|||
735 | yield root, dirs, files |
||
0 ignored issues
–
show
|
|||
736 | if until(root): |
||
0 ignored issues
–
show
|
|||
737 | del dirs[:] |
||
0 ignored issues
–
show
|
|||
738 | |||
739 | |||
740 | def walk_until_level(some_dir, level: Optional[int]=None) -> Iterator[typing.Tuple[str, str, str]]: |
||
0 ignored issues
–
show
|
|||
741 | """ |
||
0 ignored issues
–
show
|
|||
742 | Walk up to a maximum recursion depth. |
||
743 | Returns files and directories in the same manner as os.walk |
||
744 | Taken partly from https://stackoverflow.com/questions/7159607/list-directories-with-a-specified-depth-in-python |
||
0 ignored issues
–
show
|
|||
745 | :param some_dir: |
||
746 | :param level: Maximum recursion depth, starting at 0 |
||
747 | """ |
||
748 | some_dir = some_dir.rstrip(os.path.sep) |
||
0 ignored issues
–
show
|
|||
749 | assert os.path.isdir(some_dir) |
||
0 ignored issues
–
show
|
|||
750 | num_sep = some_dir.count(os.path.sep) |
||
0 ignored issues
–
show
|
|||
751 | for root, dirs, files in os.walk(some_dir): |
||
0 ignored issues
–
show
|
|||
752 | yield root, dirs, files |
||
0 ignored issues
–
show
|
|||
753 | num_sep_this = root.count(os.path.sep) |
||
0 ignored issues
–
show
|
|||
754 | if level is None or num_sep + level <= num_sep_this: |
||
0 ignored issues
–
show
|
|||
755 | del dirs[:] |
||
0 ignored issues
–
show
|
|||
756 | |||
757 | |||
758 | class SubcommandHandler: |
||
759 | """A convenient wrapper for a program that uses command-line subcommands. |
||
0 ignored issues
–
show
|
|||
760 | Calls any method that belongs to the target |
||
761 | :param parser: Should contain a description and help text, but should NOT contain any arguments. |
||
762 | :param target: An object (or type) that contains a method for each subcommand; a dash (-) in the argument is converted to an underscore. |
||
0 ignored issues
–
show
|
|||
763 | :param temp_dir: A temporary directory |
||
764 | :param error_handler: Called logging any exception except for KeyboardInterrupt or SystemExit (exceptions in here are ignored) |
||
0 ignored issues
–
show
|
|||
765 | :param cancel_handler: Called after logging a KeyboardInterrupt or SystemExit (exceptions in here are ignored) |
||
0 ignored issues
–
show
|
|||
766 | """ |
||
767 | def __init__(self, |
||
0 ignored issues
–
show
|
|||
768 | parser: argparse.ArgumentParser, target: Any, |
||
0 ignored issues
–
show
|
|||
769 | temp_dir: Optional[str] = None, |
||
0 ignored issues
–
show
|
|||
770 | error_handler: Callable[[BaseException], None] = lambda e: None, |
||
0 ignored issues
–
show
|
|||
771 | cancel_handler: Callable[[Union[KeyboardInterrupt, SystemExit]], None] = lambda e: None |
||
0 ignored issues
–
show
|
|||
772 | ) -> None: |
||
0 ignored issues
–
show
|
|||
773 | parser.add_argument('subcommand', help='Subcommand to run') |
||
0 ignored issues
–
show
|
|||
774 | self.parser = parser |
||
0 ignored issues
–
show
|
|||
775 | self.target = target |
||
0 ignored issues
–
show
|
|||
776 | self.temp_dir = temp_dir |
||
0 ignored issues
–
show
|
|||
777 | self.error_handler = error_handler |
||
0 ignored issues
–
show
|
|||
778 | self.cancel_handler = cancel_handler |
||
0 ignored issues
–
show
|
|||
779 | |||
780 | |||
781 | def run(self, args: List[str]) -> None: |
||
0 ignored issues
–
show
|
|||
782 | |||
783 | full_args = self.parser.parse_args(args[1:2]) |
||
0 ignored issues
–
show
|
|||
784 | subcommand = full_args.subcommand.replace('-', '_') |
||
0 ignored issues
–
show
|
|||
785 | |||
786 | if not hasattr(self.target, subcommand) and not subcommand.startswith('_'): |
||
0 ignored issues
–
show
|
|||
787 | print(Fore.RED + 'Unrecognized subcommand {}'.format(subcommand)) |
||
0 ignored issues
–
show
|
|||
788 | self.parser.print_help() |
||
0 ignored issues
–
show
|
|||
789 | return |
||
0 ignored issues
–
show
|
|||
790 | |||
791 | # clever; from Chase Seibert: https://chase-seibert.github.io/blog/2014/03/21/python-multilevel-argparse.html |
||
0 ignored issues
–
show
|
|||
792 | # use dispatch pattern to invoke method with same name |
||
793 | try: |
||
0 ignored issues
–
show
|
|||
794 | if self.temp_dir is not None: |
||
0 ignored issues
–
show
|
|||
795 | if pexists(self.temp_dir) and pdir(self.temp_dir): shutil.rmtree(self.temp_dir) |
||
0 ignored issues
–
show
|
|||
796 | elif pexists(self.temp_dir): raise PathIsNotADirError(self.temp_dir) |
||
0 ignored issues
–
show
|
|||
797 | remake_dirs(self.temp_dir) |
||
0 ignored issues
–
show
|
|||
798 | logger.debug("Created temp dir at {}".format(self.temp_dir)) |
||
0 ignored issues
–
show
|
|||
799 | getattr(self.target, subcommand)() |
||
0 ignored issues
–
show
|
|||
800 | except NaturalExpectedError as e: |
||
0 ignored issues
–
show
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
801 | pass # ignore totally |
||
0 ignored issues
–
show
|
|||
802 | except KeyboardInterrupt as e: |
||
0 ignored issues
–
show
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
803 | try: |
||
0 ignored issues
–
show
|
|||
804 | logger.fatal("Received cancellation signal", exc_info=True) |
||
0 ignored issues
–
show
|
|||
805 | self.cancel_handler(e) |
||
0 ignored issues
–
show
|
|||
806 | except BaseException: pass |
||
0 ignored issues
–
show
Catching very general exceptions such as
BaseException is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
807 | raise e |
||
0 ignored issues
–
show
|
|||
808 | except SystemExit as e: |
||
0 ignored issues
–
show
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
809 | try: |
||
0 ignored issues
–
show
|
|||
810 | logger.fatal("Received system exit signal", exc_info=True) |
||
0 ignored issues
–
show
|
|||
811 | self.cancel_handler(e) |
||
0 ignored issues
–
show
|
|||
812 | except BaseException: pass |
||
0 ignored issues
–
show
Catching very general exceptions such as
BaseException is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
813 | raise e |
||
0 ignored issues
–
show
|
|||
814 | except BaseException as e: |
||
0 ignored issues
–
show
Variable name "e" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
815 | try: |
||
0 ignored issues
–
show
|
|||
816 | logger.fatal("{} failed!".format(self.parser.prog), exc_info=True) |
||
0 ignored issues
–
show
|
|||
817 | self.error_handler(e) |
||
0 ignored issues
–
show
|
|||
818 | except BaseException: pass |
||
0 ignored issues
–
show
Catching very general exceptions such as
BaseException is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception. ![]() |
|||
819 | raise e |
||
0 ignored issues
–
show
|
|||
820 | finally: |
||
0 ignored issues
–
show
|
|||
821 | if self.temp_dir is not None: |
||
0 ignored issues
–
show
|
|||
822 | if pexists(self.temp_dir): |
||
0 ignored issues
–
show
|
|||
823 | logger.debug("Deleted temp dir at {}".format(self.temp_dir)) |
||
0 ignored issues
–
show
|
|||
824 | shutil.rmtree(self.temp_dir) |
||
0 ignored issues
–
show
|
|||
825 | try: |
||
0 ignored issues
–
show
|
|||
826 | os.remove(self.temp_dir) |
||
0 ignored issues
–
show
|
|||
827 | except IOError: pass |
||
0 ignored issues
–
show
|
|||
828 | |||
829 | #__all__ = ['scan_for_files', 'walk_until', 'walk_until_level'] |
||
830 |