Completed
Push — master ( 3ef9fd...3c6cae )
by Alexandre M.
56s
created

hansel.Crumb._arg_values()   C

Complexity

Conditions 8

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 56
rs 6.1256

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
3
# vi: set ft=python sts=4 ts=4 sw=4 et:
4
"""
5
Crumb class: the smart path model class.
6
"""
7
8
import os.path     as op
9
from   copy        import deepcopy
10
from   collections import OrderedDict, Mapping, Sequence
11
from   pathlib     import Path
12
from   functools   import partial
13
14
from   six import string_types
15
16
from   .utils import remove_duplicates, list_children
17
from   ._utils import (_get_path, _arg_name,
18
                       _is_crumb_arg, _replace,
19
                       _split_exists, _split,
20
                       _touch, has_crumbs, is_valid,
21
                       #_arg_format,
22
                       )
23
24
25
class Crumb(object):
26
    """ The crumb path model class.
27
    Parameters
28
    ----------
29
    crumb_path: str
30
        A file or folder path with crumb arguments. See Examples.
31
32
    ignore_list: sequence of str
33
        A list of `fnmatch` patterns of filenames to be ignored.
34
35
    Examples
36
    --------
37
    >>> crumb = Crumb("{base_dir}/raw/{subject_id}/{session_id}/{modality}/{image}")
38
    >>> cr = Crumb(op.join(op.expanduser('~'), '{user_folder}'))
39
    """
40
    _arg_start_sym = '{'
41
    _arg_end_sym   = '}'
42
43
    # specify partial functions from _utils with _arg_start_sym and _arg_end_sym
44
    # everything would be much simpler if I hardcoded these symbols but I still
45
    # feel that this flexibility is nice to have.
46
    # _arg_format   = partial(_arg_format,   start_sym=_arg_start_sym, end_sym=_arg_end_sym)
47
    _is_crumb_arg = partial(_is_crumb_arg, start_sym=_arg_start_sym, end_sym=_arg_end_sym)
48
    _arg_name     = partial(_arg_name,     start_sym=_arg_start_sym, end_sym=_arg_end_sym)
49
    is_valid      = partial(is_valid,      start_sym=_arg_start_sym, end_sym=_arg_end_sym)
50
    has_crumbs    = partial(has_crumbs,    start_sym=_arg_start_sym, end_sym=_arg_end_sym)
51
    _replace      = partial(_replace,      start_sym=_arg_start_sym, end_sym=_arg_end_sym)
52
    _split        = partial(_split,        start_sym=_arg_start_sym, end_sym=_arg_end_sym)
53
    _touch        = partial(_touch,        start_sym=_arg_start_sym, end_sym=_arg_end_sym)
54
    _split_exists = partial(_split_exists, start_sym=_arg_start_sym, end_sym=_arg_end_sym)
55
56
57
    def __init__(self, crumb_path, ignore_list=()):
58
        self._path   = _get_path(crumb_path)
59
        self._argidx = OrderedDict()
60
        self._ignore = ignore_list
61
        self._update()
62
63
    @property
64
    def path(self):
65
        """Return the current crumb path string."""
66
        return self._path
67
68
    @path.setter
69
    def path(self, value):
70
        """ Set the current crumb path string and updates the internal members.
71
        Parameters
72
        ----------
73
        value: str
74
            A file or folder path with crumb arguments. See Examples in class docstring.
75
        """
76
        self._path = value
77
        self._update()
78
79
    def _check(self):
80
        if not self.is_valid(self._path):
81
            raise ValueError("The current crumb path has errors, got {}.".format(self.path))
82
83
    def _update(self):
84
        """ Clean up, parse the current crumb path and fill the internal
85
        members for functioning."""
86
        self._clean()
87
        self._check()
88
        self._set_argidx()
89
        # self._set_replace_func()
90
91
    def _clean(self):
92
        """ Clean up the private utility members, i.e., _argidx. """
93
        self._argidx = OrderedDict()
94
95
    @classmethod
96
    def copy(cls, crumb):
97
        """ Return a deep copy of the given `crumb`.
98
        Parameters
99
        ----------
100
        crumb: str or Crumb
101
102
        Returns
103
        -------
104
        copy: Crumb
105
        """
106
        if isinstance(crumb, cls):
107
            return cls(crumb._path, ignore_list=crumb._ignore)
108
        elif isinstance(crumb, string_types):
109
            return cls.from_path(crumb)
110
        else:
111
            raise TypeError("Expected a Crumb or a str to copy, got {}.".format(type(crumb)))
112
113
    def _set_argidx(self):
114
        """ Initialize the self._argidx dict. It holds arg_name -> index.
115
        The index is the position in the whole `_path.split(op.sep)` where each argument is.
116
        """
117
        fs = self._path_split()
118
        for idx, f in enumerate(fs):
119
            if self._is_crumb_arg(f):
120
                self._argidx[self._arg_name(f)] = idx
121
122
    def _find_arg(self, arg_name):
123
        """ Return the index in the current path of the crumb
124
        argument with name `arg_name`.
125
        """
126
        return self._argidx.get(arg_name, -1)
127
128
    def isabs(self):
129
        """ Return True if the current crumb path has an
130
        absolute path, False otherwise.
131
        This means that if it is valid and does not start with a `op.sep` character
132
        or hard disk letter.
133
        """
134
        if not self.is_valid(self._path):
135
            raise ValueError("The given crumb path has errors, got {}.".format(self.path))
136
137
        subp = self._path.split(self._arg_start_sym)[0]
138
        return op.isabs(subp)
139
140
    def abspath(self, first_is_basedir=False):
141
        """ Return a copy of `self` with an absolute crumb path.
142
        Add as prefix the absolute path to the current directory if the current
143
        crumb is not absolute.
144
        Parameters
145
        ----------
146
        first_is_basedir: bool
147
            If True and the current crumb path starts with a crumb argument and first_is_basedir,
148
            the first argument will be replaced by the absolute path to the current dir,
149
            otherwise the absolute path to the current dir will be added as a prefix.
150
151
152
        Returns
153
        -------
154
        abs_crumb: Crumb
155
        """
156
        if not self.is_valid(self._path):
157
            raise ValueError("The given crumb path has errors, got {}.".format(self.path))
158
159
        if self.isabs():
160
            return deepcopy(self)
161
162
        return self.copy(self._abspath(first_is_basedir=first_is_basedir))
163
164
    def _path_split(self):
165
        return self._path.split(op.sep)
166
167
    def _abspath(self, first_is_basedir=False):
168
        """ Return the absolute path of the current crumb path.
169
        Parameters
170
        ----------
171
        first_is_basedir: bool
172
            If True and the current crumb path starts with a crumb argument and first_is_basedir,
173
            the first argument will be replaced by the absolute path to the current dir,
174
            otherwise the absolute path to the current dir will be added as a prefix.
175
176
177
        Returns
178
        -------
179
        abspath: str
180
        """
181
        if not self.has_crumbs(self._path):
182
             return op.abspath(self._path)
183
184
        splt = self._path_split()
185
        path = []
186
        if self._is_crumb_arg(splt[0]):
187
            path.append(op.abspath(op.curdir))
188
189
        if not first_is_basedir:
190
            path.append(splt[0])
191
192
        if splt[1:]:
193
            path.extend(splt[1:])
194
195
        return op.sep.join(path)
196
197
    def split(self):
198
        """ Return a list of sub-strings of the current crumb path where the
199
            path parts are separated from the crumb arguments.
200
201
        Returns
202
        -------
203
        crumbs: list of str
204
        """
205
        return self._split(self._path)
206
207
    @classmethod
208
    def from_path(cls, crumb_path):
209
        """ Create an instance of Crumb out of `crumb_path`.
210
        Parameters
211
        ----------
212
        val: str or Crumb or pathlib.Path
213
214
        Returns
215
        -------
216
        path: Crumb
217
        """
218
        if isinstance(crumb_path, (cls, Path)):
219
            return crumb_path
220
221
        if isinstance(crumb_path, string_types):
222
            return cls(crumb_path)
223
        else:
224
            raise TypeError("Expected a `val` to be a `str`, got {}.".format(type(crumb_path)))
225
226
    # def _set_replace_func(self):
227
    #     """ Set the fastest replace algorithm depending on how
228
    #     many arguments the path has."""
229
    #     self._replace = self._replace2
230
    #     if len(self._argidx) > 5:
231
    #         self._replace = self._replace1
232
233
    # def _replace2(self, start_sym='{', end_sym='}', **kwargs):
234
    #
235
    #     if start_sym != '{' or end_sym != '}':
236
    #         raise NotImplementedError
237
    #
238
    #     if not kwargs:
239
    #         return self._path
240
    #
241
    #     args = {v: self._arg_format(v) for v in self._argidx}
242
    #
243
    #     for k in kwargs:
244
    #         if k not in args:
245
    #             raise KeyError("Could not find argument {}"
246
    #                            " in `path` {}.".format(k, self._path))
247
    #
248
    #         args[k] = kwargs[k]
249
    #
250
    #     return self._path.format_map(args)
251
252
    def _lastarg(self):
253
        """ Return the name and idx of the last argument."""
254
        for arg, idx in reversed(list(self._argidx.items())):
255
            return arg, idx
256
257
    def _firstarg(self):
258
        """ Return the name and idx of the first argument."""
259
        for arg, idx in self._argidx.items():
260
            return arg, idx
261
262
    def _is_firstarg(self, arg_name):
263
        """ Return True if `arg_name` is the first argument."""
264
        # Take into account that self._argidx is OrderedDict
265
        return arg_name == self._firstarg()[0]
266
267
    def _arg_values(self, arg_name, arg_values=None):
268
        """ Return the existing values in the file system for the crumb argument
269
        with name `arg_name`.
270
        The `arg_values` must be a sequence with the tuples with valid values of the dependent
271
        (previous in the path) crumb arguments.
272
        The format of `arg_values` work in such a way that `self._path.format(dict(arg_values[0]))`
273
        would give me a valid path or crumb.
274
        Parameters
275
        ----------
276
        arg_name: str
277
278
        arg_values: list of tuples
279
280
        Returns
281
        -------
282
        vals: list of tuples
283
284
        Raises
285
        ------
286
        ValueError: if `arg_values` is None and `arg_name` is not the
287
        first crumb argument in self._path
288
289
        IOError: if this crosses to any path that is non-existing.
290
        """
291
        if arg_values is None and not self._is_firstarg(arg_name):
292
            raise ValueError("Cannot get the list of values for {} if"
293
                             " the previous arguments are not filled"
294
                             " in `paths`.".format(arg_name))
295
296
        aidx = self._find_arg(arg_name)
297
298
        # check if the path is absolute, do it absolute
299
        apath = self._abspath()
300
        splt = apath.split(op.sep)
301
302
        if aidx == len(splt) - 1:  # this means we have to list files too
303
            just_dirs = False
304
        else:  # this means we have to list folders
305
            just_dirs = True
306
307
        vals = []
308
        if arg_values is None:
309
            base = op.sep.join(splt[:aidx])
310
            vals = [[(arg_name, val)] for val in list_children(base, just_dirs=just_dirs, ignore=self._ignore)]
311
        else:
312
            for aval in arg_values:
313
                #  create the part of the crumb path that is already specified
314
                path = self._split(self._replace(self._path, **dict(aval)))[0]
315
316
                #  list the children of `path`
317
                subpaths = list_children(path, just_dirs=just_dirs, ignore=self._ignore)
318
319
                #  extend `val` tuples with the new list of values for `aval`
320
                vals.extend([aval + [(arg_name, sp)] for sp in subpaths])
321
322
        return vals
323
324
    def replace(self, **kwargs):
325
        """ Return a copy of self with the crumb arguments in
326
        `kwargs` replaced by its values.
327
        Parameters
328
        ----------
329
        kwargs: strings
330
331
        Returns
332
        -------
333
        crumb:
334
        """
335
        for arg_name in kwargs:
336
            if arg_name not in self._argidx:
337
                raise KeyError("Expected `arg_name` to be one of ({}),"
338
                                 " got {}.".format(list(self._argidx), arg_name))
339
340
        cr = self.copy(self)
341
        cr._path = cr._replace(self._path, **kwargs)
342
        return Crumb.from_path(cr._path)
343
344
    def _arg_deps(self, arg_name):
345
        """ Return a subdict of `self._argidx` with the
346
         values from the crumb arguments that come before
347
         `arg_name` in the crumb path.
348
        Parameters
349
        ----------
350
        arg_name: str
351
352
        Returns
353
        -------
354
        arg_deps: Mapping[str, int]
355
        """
356
        argidx = self._find_arg(arg_name)
357
        return OrderedDict([(arg, idx) for arg, idx in self._argidx.items() if idx <= argidx])
358
359
    def values_map(self, arg_name, check_exists=False):
360
        """ Return a list of tuples of crumb arguments with their values.
361
362
        Parameters
363
        ----------
364
        arg_name: str
365
366
        check_exists: bool
367
368
        Returns
369
        -------
370
        values_map: list of lists of 2-tuples
371
        """
372
        arg_deps = self._arg_deps(arg_name)
373
        values_map = None
374
        for arg in arg_deps:
375
            values_map = self._arg_values(arg, values_map)
376
377
        if check_exists:
378
            paths = [self.from_path(path) for path in self._build_paths(values_map)]
379
            values_map_checked = [args for args, path in zip(values_map, paths) if path.exists()]
380
        else:
381
            values_map_checked = values_map
382
383
        return values_map_checked
384
385
    def _build_paths(self, values_map):
386
        """ Return a list of paths from each tuple of args from `values_map`
387
        Parameters
388
        ----------
389
        values_map: list of sequences of 2-tuple
390
391
        Returns
392
        -------
393
        paths: list of str
394
        """
395
        return [self._replace(self._path, **dict(val)) for val in values_map]
396
397
    def ls(self, arg_name, fullpath=True, rm_dups=False, make_crumbs=True, check_exists=False):
398
        """
399
        Return the list of values for the argument crumb `arg_name`.
400
        This will also unfold any other argument crumb that appears before in the
401
        path.
402
        Parameters
403
        ----------
404
        arg_name: str
405
            Name of the argument crumb to be unfolded.
406
407
        fullpath: bool
408
            If True will build the full path of the crumb path, will also append
409
            the rest of crumbs not unfolded.
410
            If False will only return the values for the argument with name
411
            `arg_name`.
412
413
        rm_dups: bool
414
            If True will remove and sort the duplicate values from the result.
415
            Otherwise it will leave it as it is.
416
417
        make_crumbs: bool
418
            If `fullpath` and `make_crumbs` is True will create a Crumb for
419
            each element of the result.
420
421
        check_exists: bool
422
            If True will return only str, Crumb or Path if it exists
423
            in the file path, otherwise it may create file paths
424
            that don't have to exist.
425
426
        Returns
427
        -------
428
        values: list of str or Crumb
429
430
        Examples
431
        --------
432
        >>> cr = Crumb(op.join(op.expanduser('~'), '{user_folder}'))
433
        >>> user_folders = cr.ls('user_folder', fullpath=True, rm_dups=True, make_crumbs=True)
434
        """
435
        if arg_name not in self._argidx:
436
            raise ValueError("Expected `arg_name` to be one of ({}),"
437
                             " got {}.".format(list(self._argidx), arg_name))
438
439
        # if the first chunk of the path is a parameter, I am not interested in this (for now)
440
        if self._path.startswith(self._arg_start_sym):
441
            raise NotImplementedError("Can't list paths that starts"
442
                                      " with an argument.")
443
444
        if make_crumbs and not fullpath:
445
            raise ValueError("`make_crumbs` can only work if `fullpath` is also True.")
446
447
        values_map = self.values_map(arg_name, check_exists=check_exists)
448
449
        if not fullpath and not make_crumbs:
450
            paths = [dict(val)[arg_name] for val in values_map]
451
        else:
452
            paths = self._build_paths(values_map)
453
454
        if rm_dups:
455
            paths = remove_duplicates(paths)
456
457
        if fullpath and make_crumbs:
458
            paths = sorted([self.from_path(path) for path in paths])
459
460
        return paths
461
462
    def _remaining_deps(self, arg_names):
463
        """ Return the name of the arguments that are dependencies of `arg_names`.
464
        Parameters
465
        ----------
466
        arg_names: Sequence[str]
467
468
        Returns
469
        -------
470
        rem_deps: Sequence[str]
471
        """
472
        started = False
473
        rem_deps = []
474
        for an in reversed(list(self._argidx.keys())):  # take into account that argidx is ordered
475
            if an in arg_names:
476
                started = True
477
            else:
478
                if started:
479
                    rem_deps.append(an)
480
481
        return rem_deps
482
483
    def touch(self):
484
        """ Create a leaf directory and all intermediate ones
485
        using the non crumbed part of `crumb_path`.
486
        If the target directory already exists, raise an IOError
487
        if exist_ok is False. Otherwise no exception is raised.
488
        Parameters
489
        ----------
490
        crumb_path: str
491
492
        exist_ok: bool
493
            Default = True
494
495
        Returns
496
        -------
497
        nupath: str
498
            The new path created.
499
        """
500
        return self._touch(self._path)
501
502
    def joinpath(self, suffix):
503
        """ Return a copy of the current crumb with the `suffix` path appended.
504
        If suffix has crumb arguments, the whole crumb will be updated.
505
        Parameters
506
        ----------
507
        suffix: str
508
509
        Returns
510
        -------
511
        cr: Crumb
512
        """
513
        return Crumb(op.join(self._path, suffix))
514
515
    def exists(self):
516
        """ Return True if the current crumb path is a possibly existing path,
517
        False otherwise.
518
        Returns
519
        -------
520
        exists: bool
521
        """
522
        if not self.has_crumbs(self._path):
523
            return op.exists(str(self)) or op.islink(str(self))
524
525
        if not op.exists(self.split()[0]):
526
            return False
527
528
        last, _ = self._lastarg()
529
        paths = self.ls(last,
530
                        fullpath     = True,
531
                        make_crumbs  = False,
532
                        rm_dups   = True,
533
                        check_exists = False)
534
535
        return all([self._split_exists(lp) for lp in paths])
536
537
    def has_files(self):
538
        """ Return True if the current crumb path has any file in its
539
        possible paths.
540
        Returns
541
        -------
542
        has_files: bool
543
        """
544
        if not op.exists(self.split()[0]):
545
            return False
546
547
        last, _ = self._lastarg()
548
        paths = self.ls(last,
549
                        fullpath     = True,
550
                        make_crumbs  = True,
551
                        rm_dups      = False,
552
                        check_exists = True)
553
554
        return any([op.isfile(str(lp)) for lp in paths])
555
556
    def unfold(self):
557
        """ Return a list of all the existing paths until the last crumb argument.
558
        Returns
559
        -------
560
        paths: list of pathlib.Path
561
        """
562
        return self.ls(self._lastarg()[0],
563
                       fullpath    = True,
564
                       rm_dups     = True,
565
                       make_crumbs = True,
566
                       check_exists= True)
567
568
    def __getitem__(self, arg_name):
569
        """ Return the existing values of the crumb argument `arg_name`
570
        without removing duplicates.
571
        Parameters
572
        ----------
573
        arg_name: str
574
575
        Returns
576
        -------
577
        values: list of str
578
        """
579
        return self.ls(arg_name,
580
                       fullpath    = False,
581
                       rm_dups     = False,
582
                       make_crumbs = False,
583
                       check_exists= True)
584
585
    def __setitem__(self, key, value):
586
        if key not in self._argidx:
587
            raise KeyError("Expected `arg_name` to be one of ({}),"
588
                           " got {}.".format(list(self._argidx), key))
589
590
        self._path = self._replace(self._path, **{key: value})
591
        self._update()
592
593
    def __ge__(self, other):
594
        return self._path >= str(other)
595
596
    def __le__(self, other):
597
        return self._path <= str(other)
598
599
    def __gt__(self, other):
600
        return self._path > str(other)
601
602
    def __lt__(self, other):
603
        return self._path < str(other)
604
605
    def __hash__(self):
606
        return self._path.__hash__()
607
608
    def __contains__(self, item):
609
        return item in self._argidx
610
611
    def __repr__(self):
612
        return '{}("{}")'.format(__class__.__name__, self._path)
613
614
    def __str__(self):
615
        return str(self._path)
616
617
    def __eq__(self, other):
618
        """ Return True if `self` and `other` are equal, False otherwise.
619
        Parameters
620
        ----------
621
        other: Crumb
622
623
        Returns
624
        -------
625
        is_equal: bool
626
        """
627
        if self._path != other._path:
628
            return False
629
630
        if self._argidx != other._argidx:
631
            return False
632
633
        if self._ignore != other._ignore:
634
            return False
635
636
        return True
637
638