Completed
Push — master ( d09250...b67a9c )
by Alexandre M.
9s
created

copy_all_files()   B

Complexity

Conditions 6

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
c 0
b 0
f 0
dl 0
loc 20
rs 8
1
import itertools
2
import os
3
import shutil
4
from collections import defaultdict, OrderedDict
5
from typing import Iterator, List, Tuple, Dict
6
7
import hansel
8
from hansel.utils import _get_matching_items
9
10
CrumbArgsMap = Iterator[List[Tuple[str, str]]]
11
12
13
def joint_value_map(crumb: hansel.Crumb, arg_names: Iterator[str], check_exists: bool = True) -> CrumbArgsMap:
14
    """Return a list of tuples of crumb argument values of the given
15
    `arg_names`.
16
17
    Parameters
18
    ----------
19
    crumb: hansel.Crumb
20
21
    arg_names: List[str]
22
23
    check_exists: bool
24
        If True will return only a values_map with sets of crumb arguments that
25
        fill a crumb to an existing path.
26
        Otherwise it won't check if they exist and return all possible
27
        combinations.
28
29
    Returns
30
    -------
31
    values_map: list of lists of 2-tuples
32
        I call values_map what is called `record` in pandas. It is a list of
33
        lists of 2-tuples, where each 2-tuple has the
34
        shape (arg_name, arg_value).
35
    """
36
    values_map = []
37
    for arg_name in arg_names:
38
        values_map.append(list((arg_name, arg_value)
39
                               for arg_value in crumb[arg_name]))
40
41
    if len(arg_names) == 1:
42
        return [(i,) for i in values_map[0]]
43
    else:
44
        if not check_exists:
45
            values_map_checked = values_map[:]
46
        else:
47
            args_crumbs = [(args, crumb.replace(**dict(args)))
48
                           for args in set(itertools.product(*values_map))]
49
50
            values_map_checked = [args for args, cr in args_crumbs
51
                                  if cr.exists()]
52
53
    return sorted(values_map_checked)
54
55
56 View Code Duplication
def intersection(crumb1: hansel.Crumb, crumb2: hansel.Crumb, on: Iterator[str]=None) -> List[str]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
57
    """Return an 'inner join' of both given Crumbs, i.e., will return a list of
58
    Crumbs with common values for the common arguments of both crumbs.
59
60
    If `on` is None, will use all the common arguments names of both crumbs.
61
    Otherwise will use only the elements of `on`. All its items must be in
62
    both crumbs.
63
64
    Returns
65
    -------
66
    arg_names: list
67
        The matching items.
68
69
    Parameters
70
    ----------
71
    crumb1: hansel.Crumb
72
73
    crumb2: hansel.Crumb
74
75
    on: str or list of str
76
        Crumb argument names common to both input crumbs.
77
78
    Raises
79
    ------
80
    ValueError:
81
        If an element of `on` does not exists in either `list1` or `list2`.
82
83
    KeyError:
84
        If the result is empty.
85
86
    Returns
87
    -------
88
    inner_join: list[hansel.Crumb]
89
90
    Notes
91
    -----
92
    Use with care, ideally the argument matches should be in the same order in
93
    both crumbs.
94
95
    Both crumbs must have at least one matching identifier argument and one
96
    of those must be the one in `on`.
97
    """
98
    if isinstance(on, str):
99
        on = [on]
100
101
    arg_names = list(_get_matching_items(list(crumb1.all_args()), list(crumb2.all_args()), items=on))
102
103
    if not arg_names:
104
        raise KeyError("Could not find matching arguments between {} and  {} limited by {}.".format(
105
            list(crumb1.all_args()),
106
            list(crumb2.all_args()),
107
            on)
108
        )
109
110
    maps1 = joint_value_map(crumb1, arg_names, check_exists=True)
111
    maps2 = joint_value_map(crumb2, arg_names, check_exists=True)
112
113
    intersect = set(maps1) & (set(maps2))
114
115
    return sorted(list(intersect))
116
117
118 View Code Duplication
def difference(crumb1: 'hansel.Crumb', crumb2: 'hansel.Crumb', on: Iterator[str] = None) -> List[str]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
119
    """Return the difference `crumb1` - `crumb2`, i.e., will return a list of
120
    Crumbs that are in `crumb1` but not in `crumb2`.
121
122
    If `on` is None, will use all the common arguments names of both crumbs.
123
    Otherwise will use only the elements of `on`. All its items must be in
124
    both crumbs.
125
126
    Returns
127
    -------
128
    arg_names: list
129
        The matching items.
130
131
    Parameters
132
    ----------
133
    crumb1: hansel.Crumb
134
135
    crumb2: hansel.Crumb
136
137
    on: str or list of str
138
        Crumb argument names common to both input crumbs.
139
140
    Raises
141
    ------
142
    ValueError:
143
        If an element of `on` does not exists in either `list1` or `list2`.
144
145
    KeyError:
146
        If the result is empty.
147
148
    Returns
149
    -------
150
    inner_join: list[hansel.Crumb]
151
152
    Notes
153
    -----
154
    Use with care, ideally the argument matches should be in the same order in
155
    both crumbs.
156
157
    Both crumbs must have at least one matching identifier argument and one
158
    of those must be the one in `id_colname`.
159
    """
160
    if isinstance(on, str):
161
        on = [on]
162
163
    arg_names = list(_get_matching_items(list(crumb1.all_args()),
164
                                         list(crumb2.all_args()),
165
                                         items=on))
166
167
    if not arg_names:
168
        raise KeyError("Could not find matching arguments between "
169
                       "{} and  {} limited by {}.".format(list(crumb1.all_args()),
170
                                                          list(crumb2.all_args()),
171
                                                          on))
172
173
    maps1 = joint_value_map(crumb1, arg_names, check_exists=True)
174
    maps2 = joint_value_map(crumb2, arg_names, check_exists=True)
175
176
    diff = set(maps1).difference(set(maps2))
177
178
    return sorted(list(diff))
179
180
181
def valuesmap_to_dict(values_map: CrumbArgsMap) -> Dict[str, List[str]]:
182
    """Converts a values_map or records type (a list of list of 2-tuple with
183
    shape '(arg_name, arg_value)') to a dictionary of lists of values where the
184
    keys are the arg_names.
185
    Parameters
186
    ----------
187
    values_map: list of list of 2-tuple of str
188
189
    Returns
190
    -------
191
    adict: dict
192
        The dictionary with the values in `values_map` in the form of a
193
        dictionary.
194
195
    Raises
196
    ------
197
    IndexError
198
        If the list_of_dicts is empty or can't be indexed.
199
200
    KeyError
201
        If any list inside the `values_map` doesn't have all the keys in the
202
        first dict.
203
    """
204
    return append_dict_values([OrderedDict(rec) for rec in values_map])
205
206
207
def append_dict_values(list_of_dicts: Iterator[Dict[str, str]], keys: Iterator[str]=None) -> Dict[str, List[str]]:
208
    """Return a dict of lists from a list of dicts with the same keys as the
209
    internal dicts.
210
    For each dict in list_of_dicts will look for the values of the given keys
211
    and append it to the output dict.
212
213
    Parameters
214
    ----------
215
    list_of_dicts: list of dicts
216
        The first dict in this list will be used as reference for the key names
217
        of all the other dicts.
218
219
    keys: list of str
220
        List of keys to create in the output dict
221
        If None will use all keys in the first element of list_of_dicts
222
    Returns
223
    -------
224
    DefaultOrderedDict of lists
225
226
    Raises
227
    ------
228
    IndexError
229
        If the list_of_dicts is empty or can't be indexed.
230
231
    KeyError
232
        If any dict inside the `list_of_dicts` doesn't have all the keys in the
233
        first dict.
234
    """
235
    if keys is None:
236
        try:
237
            keys = list(list_of_dicts[0].keys())
238
        except IndexError:
239
            raise IndexError('Could not get the first element of the list.')
240
241
    dict_of_lists = defaultdict(list)
242
    for d in list_of_dicts:
243
        for k in keys:
244
            dict_of_lists[k].append(d[k])
245
    return dict_of_lists
246
247
248
def copy_args(src_crumb: hansel.Crumb, dst_crumb: hansel.Crumb):
249
    """Will copy the argument values of `src_crumb` to the open arguments of
250
    `dst_crumb`.
251
    """
252
    for arg_name in dst_crumb.open_args():
253
        dst_crumb[arg_name] = src_crumb[arg_name][0]
254
255
256
def _remove_if_ok_and_exists(path: str, exist_ok: bool):
257
    """ Raise FileExistError if the path exists and exist_ok is False."""
258
    if not exist_ok and os.path.exists(path):
259
        raise FileExistsError('Path {} already exists.'.format(path))
260
261
    if os.path.exists(path):
262
        os.remove(path)
263
264
265
def copy_all_files(src_path: str, dst_path: str, exist_ok: bool=True, verbose: bool=False):
266
    """Will copy everything from `src_path` to `dst_path`.
267
    Both can be a folder path or a file path.
268
    """
269
    copy_func = shutil.copy2
270
    if verbose:
271
        print("Copying {} -> {}".format(src_path, dst_path))
272
273
    if os.path.isdir(src_path):
274
        if exist_ok:
275
            shutil.rmtree(dst_path)
276
277
        shutil.copytree(src_path, dst_path, copy_function=copy_func)
278
    elif os.path.isfile(src_path):
279
        os.makedirs(os.path.dirname(dst_path), exist_ok=exist_ok)
280
        try:
281
            copy_func(src_path, dst_path, follow_symlinks=True)
282
        except shutil.SameFileError:
283
            os.remove(dst_path)
284
            copy_func(src_path, dst_path, follow_symlinks=True)
285
286
287
def link_all_files(src_path: str, dst_path: str, exist_ok: bool=True, verbose: bool=False):
288
    """Make link from src_path to dst_path."""
289
    if not os.path.isabs(src_path):
290
        src_path = os.path.relpath(src_path, os.path.dirname(dst_path))
291
292
    if verbose:
293
        print("Linking {} -> {}".format(src_path, dst_path))
294
295
    os.makedirs(os.path.dirname(dst_path), exist_ok=True)
296
297
    _remove_if_ok_and_exists(dst_path, exist_ok=exist_ok)
298
    os.symlink(src_path, dst_path)
299
300
301
def _crumb_fill_dst(src_crumb: hansel.Crumb, dst_crumb: hansel.Crumb) -> Iterator[Tuple[hansel.Crumb, hansel.Crumb]]:
302
    """ Will list `src_crumb` and copy the resulting item arguments into
303
    `dst_crumb`.
304
    All the defined arguments of `src_crumb.ls()[0]` must define `dst_crumb`
305
    entirely and create a path to a file or folder.
306
    """
307
    for src in src_crumb.ls():
308
        dst = dst_crumb.copy()
309
        copy_args(src, dst)
310
        if dst.has_crumbs():
311
            raise AttributeError("Destination crumb still has open arguments, "
312
                                 "expected to fill it. Got {}.".format(str(dst)))
313
        yield src, dst
314
315
316
def crumb_copy(src_crumb: hansel.Crumb, dst_crumb: hansel.Crumb, exist_ok: bool=False, verbose: bool=False):
317
    """Will copy the content of `src_crumb` into `dst_crumb` folder.
318
    For this `src_crumb` and `dst_crumb` must have similar set of argument
319
    names.
320
    All the defined arguments of `src_crumb.ls()[0]` must define `dst_crumb`
321
    entirely and create a path to a file or folder.
322
    """
323
    for src, dst in _crumb_fill_dst(src_crumb, dst_crumb):
324
        copy_all_files(src.path, dst.path, exist_ok=exist_ok, verbose=verbose)
325
326
327
def crumb_link(src_crumb: hansel.Crumb, dst_crumb: hansel.Crumb, exist_ok: bool=False, verbose: bool=False):
328
    """Will link the content of `src_crumb` into `dst_crumb` folder.
329
    For this `src_crumb` and `dst_crumb` must have similar set of argument
330
    names.
331
    All the defined arguments of `src_crumb.ls()[0]` must define `dst_crumb`
332
    entirely and create a path to a file or folder.
333
    It will create the folder structure in the base of `dst_crumb` and link
334
    exclusively the leaf nodes.
335
    """
336
    for src, dst in _crumb_fill_dst(src_crumb, dst_crumb):
337
        link_all_files(src.path, dst.path, exist_ok=exist_ok, verbose=verbose)
338
339
340
def groupby_pattern(
341
    crumb: hansel.Crumb,
342
    arg_name: str,
343
    groups: Dict[str, List[hansel.Crumb]]
344
) -> Dict[str, List[hansel.Crumb]]:
345
    """Return a dictionary with the matches of `groups` values in the
346
    crumb argument `arg_name` in `crumb`.
347
348
    Parameters
349
    ----------
350
    crumb: Crumb
351
        Crumb to the folder tree.
352
353
    arg_name: str
354
        Name of the crumb argument in `crumb` that must be matched with the
355
        values of the `groups` dict.
356
357
    groups: dict[str]->str
358
        A dict where the keys are group names and the values are regular
359
        expressions (fnmatch xor re).
360
361
    Returns
362
    -------
363
    grouped: dict[str] -> list[Crumb]
364
        Map of paths from groups to the corresponding path matches.
365
    """
366
    if arg_name not in crumb:
367
        raise KeyError('Crumb {} has no argument {}.'.format(crumb, arg_name))
368
369
    paths_matched = set()
370
    mods = defaultdict(list)
371
    for mod_name, pattern in groups.items():
372
        crumb.set_pattern(arg_name, pattern)
373
        paths = crumb.ls(arg_name)
374
        if paths:
375
            mods[mod_name] = paths
376
            paths_matched = paths_matched.union(paths)
377
378
        crumb.clear_pattern(arg_name)
379
380
    return mods
381