Completed
Push — master ( b034e1...6d9b57 )
by Alexandre M.
01:42
created

hansel.Crumb._clean()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
# -*- coding: utf-8 -*-
2
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
3
# vi: set ft=python sts=4 ts=4 sw=4 et:
4
"""
5
Crumb class: the smart path model class.
6
"""
7
8
import os
9
import os.path     as op
10
from   copy        import deepcopy
11
from   collections import OrderedDict, Mapping, Sequence
12
from   pathlib     import Path
13
14
from   six import string_types
15
16
from   hansel.utils import remove_duplicates, list_children
17
18
19
class Crumb(object):
20
    """ The crumb path model class.
21
    Parameters
22
    ----------
23
    crumb_path: str
24
        A file or folder path with crumb arguments. See Examples.
25
26
    Examples
27
    --------
28
    >>> crumb = Crumb("{base_dir}/raw/{subject_id}/{session_id}/{modality}/{image}")
29
    >>> cr = Crumb(op.join(op.expanduser('~'), '{user_folder}'))
30
    """
31
    _arg_start_sym = '{'
32
    _arg_end_sym   = '}'
33
34
    def __init__(self, crumb_path):
35
        self._path   = self._get_path(crumb_path)
36
        self._argidx = OrderedDict()
37
        self._update()
38
39
    @property
40
    def path(self):
41
        """Return the current crumb path string."""
42
        return self._path
43
44
    @path.setter
45
    def path(self, value):
46
        """ Set the current crumb path string and updates the internal members.
47
        Parameters
48
        ----------
49
        value: str
50
            A file or folder path with crumb arguments. See Examples in class docstring.
51
        """
52
        self._path = value
53
        self._update()
54
55
    def _check(self):
56
        if not self.is_valid(self._path):
57
            raise ValueError("The current crumb path has errors, got {}.".format(self.path))
58
59
    def _update(self):
60
        """ Clean up, parse the current crumb path and fill the internal
61
        members for functioning."""
62
        self._clean()
63
        self._check()
64
        self._set_argidx()
65
        self._set_replace_func()
66
67
    def _clean(self):
68
        """ Clean up the private utility members, i.e., _argidx. """
69
        self._argidx = OrderedDict()
70
71
    @classmethod
72
    def _arg_name(cls, arg):
73
        """ Return the name of the argument given its crumb representation.
74
        Parameters
75
        ----------
76
        arg_crumb: str
77
78
        Returns
79
        -------
80
        arg_name: str
81
        """
82
        if not cls._is_crumb_arg(arg):
83
            raise ValueError("Expected an well formed crumb argument, "
84
                             "got {}.".format(arg))
85
        return arg[1:-1]
86
87
    def _arg_format(self, arg_name):
88
        """ Return the argument for its string `format()` representation.
89
        Parameters
90
        ----------
91
        arg_name: str
92
93
        Returns
94
        -------
95
        arg_format: str
96
        """
97
        return '{' + arg_name + '}'
98
99
    def __eq__(self, other):
100
        """ Return True if `self` and `other` are equal, False otherwise.
101
        Parameters
102
        ----------
103
        other: Crumb
104
105
        Returns
106
        -------
107
        is_equal: bool
108
        """
109
        if self._path != other._path:
110
            return False
111
112
        if self._argidx != other._argidx:
113
            return False
114
115
        return True
116
117
    @classmethod
118
    def copy(cls, crumb):
119
        """ Return a deep copy of the given `crumb`.
120
        Parameters
121
        ----------
122
        crumb: str or Crumb
123
124
        Returns
125
        -------
126
        copy: Crumb
127
        """
128
        if isinstance(crumb, cls):
129
            return cls(crumb._path)
130
        elif isinstance(crumb, string_types):
131
            return cls.from_path(crumb)
132
        else:
133
            raise TypeError("Expected a Crumb or a str to copy, got {}.".format(type(crumb)))
134
135
    def _set_argidx(self):
136
        """ Initialize the self._argidx dict. It holds arg_name -> index.
137
        The index is the position in the whole `_path.split(op.sep)` where each argument is.
138
        """
139
        fs = self._path_split()
140
        for idx, f in enumerate(fs):
141
            if self._is_crumb_arg(f):
142
                self._argidx[self._arg_name(f)] = idx
143
144
    def _set_replace_func(self):
145
        """ Set the fastest replace algorithm depending on how
146
        many arguments the path has."""
147
        self._replace = self._replace2
148
        if len(self._argidx) > 5:
149
            self._replace = self._replace1
150
151
    def _find_arg(self, arg_name):
152
        """ Return the index in the current path of the crumb
153
        argument with name `arg_name`.
154
        """
155
        return self._argidx.get(arg_name, -1)
156
157
    def isabs(self):
158
        """ Return True if the current crumb path has an
159
        absolute path, False otherwise.
160
        This means that if it is valid and does not start with a `op.sep` character
161
        or hard disk letter.
162
        """
163
        if not self.is_valid(self._path):
164
            raise ValueError("The given crumb path has errors, got {}.".format(self.path))
165
166
        subp = self._path.split(self._arg_start_sym)[0]
167
        return op.isabs(subp)
168
169
    def abspath(self, first_is_basedir=False):
170
        """ Return a copy of `self` with an absolute crumb path.
171
        Add as prefix the absolute path to the current directory if the current
172
        crumb is not absolute.
173
        Parameters
174
        ----------
175
        first_is_basedir: bool
176
            If True and the current crumb path starts with a crumb argument and first_is_basedir,
177
            the first argument will be replaced by the absolute path to the current dir,
178
            otherwise the absolute path to the current dir will be added as a prefix.
179
180
181
        Returns
182
        -------
183
        abs_crumb: Crumb
184
        """
185
        if not self.is_valid(self._path):
186
            raise ValueError("The given crumb path has errors, got {}.".format(self.path))
187
188
        if self.isabs():
189
            return deepcopy(self)
190
191
        return self.copy(self._abspath(first_is_basedir=first_is_basedir))
192
193
    def _path_split(self):
194
        return self._path.split(op.sep)
195
196
    def _abspath(self, first_is_basedir=False):
197
        """ Return the absolute path of the current crumb path.
198
        Parameters
199
        ----------
200
        first_is_basedir: bool
201
            If True and the current crumb path starts with a crumb argument and first_is_basedir,
202
            the first argument will be replaced by the absolute path to the current dir,
203
            otherwise the absolute path to the current dir will be added as a prefix.
204
205
206
        Returns
207
        -------
208
        abspath: str
209
        """
210
        if not self.has_crumbs(self._path):
211
             return op.abspath(self._path)
212
213
        splt = self._path_split()
214
        path = []
215
        if self._is_crumb_arg(splt[0]):
216
            path.append(op.abspath(op.curdir))
217
218
        if not first_is_basedir:
219
            path.append(splt[0])
220
221
        if splt[1:]:
222
            path.extend(splt[1:])
223
224
        return op.sep.join(path)
225
226
    def split(self):
227
        """ Return a list of sub-strings of the current crumb path where the
228
            path parts are separated from the crumb arguments.
229
230
        Returns
231
        -------
232
        crumbs: list of str
233
        """
234
        return self._split(self._path)
235
236
    def _default_map(self):
237
        """ Return the dict with the default format values of the
238
            crumb arguments."""
239
        return {v: self._arg_format(v) for v in self._argidx}
240
241
    @classmethod
242
    def _split(cls, crumb_path):
243
        """ Return a list of sub-strings of `crumb_path` where the
244
            path parts are separated from the crumb arguments.
245
        """
246
        crumb_path = cls._get_path(crumb_path)
247
248
        splt = []
249
        tmp = '/' if crumb_path.startswith(op.sep) else ''
250
        for i in crumb_path.split(op.sep):
251
            if i.startswith(cls._arg_start_sym):
252
                splt.append(tmp)
253
                tmp = ''
254
                splt.append(i)
255
            else:
256
                tmp = op.join(tmp, i)
257
258
        return splt
259
260
    @classmethod
261
    def is_valid(cls, crumb_path):
262
        """ Return True if `crumb_path` is a well formed path with crumb arguments,
263
        False otherwise.
264
        Parameters
265
        ----------
266
        crumb_path: str
267
268
        Returns
269
        -------
270
        is_valid: bool
271
        """
272
        crumb_path = cls._get_path(crumb_path)
273
274
        splt = crumb_path.split(op.sep)
275
        for crumb in splt:
276
            if op.isdir(crumb):
277
                continue
278
279
            if cls._is_crumb_arg(crumb):
280
                crumb = cls._arg_name(crumb)
281
282
            if cls._arg_start_sym in crumb or cls._arg_end_sym in crumb:
283
                return False
284
285
        return True
286
287
    @classmethod
288
    def _is_crumb_arg(cls, crumb_arg):
289
        """ Returns True if `crumb_arg` is a well formed
290
        crumb argument.
291
        Parameters
292
        ----------
293
        crumb_arg: str
294
            The string representing a crumb argument, e.g., "{sample_id}"
295
296
        Returns
297
        -------
298
        is_crumb_arg: bool
299
        """
300
        if not isinstance(crumb_arg, string_types):
301
            return False
302
303
        return crumb_arg.startswith(cls._arg_start_sym) and crumb_arg.endswith(cls._arg_end_sym)
304
305
    @classmethod
306
    def has_crumbs(cls, crumb_path):
307
        """ Return True if the `crumb_path.split(op.sep)` has item which is a crumb argument
308
        that starts with '{' and ends with '}'."""
309
        crumb_path = cls._get_path(crumb_path)
310
311
        splt = crumb_path.split(op.sep)
312
        for i in splt:
313
            if cls._is_crumb_arg(i):
314
                return True
315
316
        return False
317
318
    @classmethod
319
    def _get_path(cls, crumb_path):
320
        """ Return the path string from `crumb_path`.
321
        Parameters
322
        ----------
323
        crumb_path: str or Crumb
324
325
        Returns
326
        -------
327
        path: str
328
        """
329
        if isinstance(crumb_path, cls):
330
            crumb_path = crumb_path._path
331
332
        if not isinstance(crumb_path, string_types):
333
            raise TypeError("Expected `crumb_path` to be a {}, got {}.".format(string_types, type(crumb_path)))
334
335
        return crumb_path
336
337
    @classmethod
338
    def from_path(cls, crumb_path):
339
        """ Create an instance of Crumb or pathlib.Path out of `crumb_path`.
340
        It will return a Crumb if `crumb_path` has crumbs or
341
        Parameters
342
        ----------
343
        val: str, Crumb or pathlib.Path
344
345
        Returns
346
        -------
347
        path: Crumb or pathlib.Path
348
        """
349
        if isinstance(crumb_path, (cls, Path)):
350
            return crumb_path
351
352
        if isinstance(crumb_path, string_types):
353
            if cls.has_crumbs(crumb_path):
354
                return cls(crumb_path)
355
            else:
356
                return Path(crumb_path)
357
        else:
358
            raise TypeError("Expected a `val` to be a `str`, got {}.".format(type(crumb_path)))
359
360
    def _replace1(self, **kwargs):
361
        if not kwargs:
362
            return self._path
363
364
        args = self._default_map()
365
        for k in kwargs:
366
            if k not in args:
367
                raise KeyError("Could not find argument {}"
368
                               " in `path` {}.".format(k, self._path))
369
370
            args[k] = kwargs[k]
371
372
        return self._path.format_map(args)
373
374
    def _replace2(self, **kwargs):
375
        if not kwargs:
376
            return self._path
377
378
        path = self._path
379
        for k in kwargs:
380
            karg = self._arg_format(k)
381
            if k not in path:
382
                raise KeyError("Could not find argument {} in"
383
                               " `path` {}.".format(k, self._path))
384
385
            path = path.replace(karg, kwargs[k])
386
387
        return path
388
389
    def _lastarg(self):
390
        """ Return the name and idx of the last argument."""
391
        for arg, idx in reversed(list(self._argidx.items())):
392
            return arg, idx
393
394
    def _firstarg(self):
395
        """ Return the name and idx of the first argument."""
396
        for arg, idx in self._argidx.items():
397
            return arg, idx
398
399
    def _is_firstarg(self, arg_name):
400
        """ Return True if `arg_name` is the first argument."""
401
        # Take into account that self._argidx is OrderedDict
402
        return arg_name == self._firstarg()[0]
403
404
    def _arg_values(self, arg_name, arg_values=None):
405
        """ Return the existing values in the file system for the crumb argument
406
        with name `arg_name`.
407
        The `arg_values` must be a sequence with the tuples with valid values of the dependent
408
        (previous in the path) crumb arguments.
409
        The format of `arg_values` work in such a way that `self._path.format(dict(arg_values[0]))`
410
        would give me a valid path or crumb.
411
        Parameters
412
        ----------
413
        arg_name: str
414
415
        arg_values: list of tuples
416
417
        Returns
418
        -------
419
        vals: list of tuples
420
421
        Raises
422
        ------
423
        ValueError: if `arg_values` is None and `arg_name` is not the
424
        first crumb argument in self._path
425
426
        IOError: if this crosses to any path that is non-existing.
427
        """
428
        if arg_values is None and not self._is_firstarg(arg_name):
429
            raise ValueError("Cannot get the list of values for {} if"
430
                             " the previous arguments are not filled"
431
                             " in `paths`.".format(arg_name))
432
433
        aidx = self._find_arg(arg_name)
434
435
        # check if the path is absolute, do it absolute
436
        apath = self._abspath()
437
        splt = apath.split(op.sep)
438
439
        if aidx == len(splt) - 1:  # this means we have to list files too
440
            just_dirs = False
441
        else:  # this means we have to list folders
442
            just_dirs = True
443
444
        vals = []
445
        if arg_values is None:
446
            base = op.sep.join(splt[:aidx])
447
            vals = [[(arg_name, val)] for val in list_children(base, just_dirs=just_dirs)]
448
        else:
449
            for aval in arg_values:
450
                #  create the part of the crumb path that is already specified
451
                path = self._split(self._replace(**dict(aval)))[0]
452
453
                #  list the children of `path`
454
                subpaths = list_children(path, just_dirs=just_dirs)
455
456
                #  extend `val` tuples with the new list of values for `aval`
457
                vals.extend([aval + [(arg_name, sp)] for sp in subpaths])
458
459
        return vals
460
461
    def replace(self, **kwargs):
462
        """ Return a copy of self with the crumb arguments in
463
        `kwargs` replaced by its values.
464
        Parameters
465
        ----------
466
        kwargs: strings
467
468
        Returns
469
        -------
470
        crumb:
471
        """
472
        for arg_name in kwargs:
473
            if arg_name not in self._argidx:
474
                raise KeyError("Expected `arg_name` to be one of ({}),"
475
                                 " got {}.".format(list(self._argidx), arg_name))
476
477
        cr = self.copy(self)
478
        cr._path = cr._replace(**kwargs)
479
        return Crumb.from_path(cr._path)
480
481
    def _arg_deps(self, arg_name):
482
        """ Return a subdict of `self._argidx` with the
483
         values from the crumb arguments that come before
484
         `arg_name` in the crumb path.
485
        Parameters
486
        ----------
487
        arg_name: str
488
489
        Returns
490
        -------
491
        arg_deps: Mapping[str, int]
492
        """
493
        argidx = self._find_arg(arg_name)
494
        return OrderedDict([(arg, idx) for arg, idx in self._argidx.items() if idx <= argidx])
495
496
    def ls(self, arg_name, fullpath=True, duplicates=True, make_crumbs=True, check_exists=False):
497
        """
498
        Return the list of values for the argument crumb `arg_name`.
499
        This will also unfold any other argument crumb that appears before in the
500
        path.
501
        Parameters
502
        ----------
503
        arg_name: str
504
            Name of the argument crumb to be unfolded.
505
506
        fullpath: bool
507
            If True will build the full path of the crumb path, will also append
508
            the rest of crumbs not unfolded.
509
            If False will only return the values for the argument with name
510
            `arg_name`.
511
512
        duplicates: bool
513
            If False will remove and sort the duplicate values from the result.
514
            Otherwise it will leave it as it is.
515
516
        make_crumbs: bool
517
            If `fullpath` and `make_crumbs` is True will create a Crumb or a pathlib.Path
518
            for each element of the result. This will depende if the result item still has
519
            crumb arguments or not.
520
521
        check_exists: bool
522
            If True will return only str, Crumb or Path if it exists
523
            in the file path, otherwise it may create file paths
524
            that don't have to exist.
525
526
        Returns
527
        -------
528
        values: list of str or Crumb
529
530
        Examples
531
        --------
532
        >>> cr = Crumb(op.join(op.expanduser('~'), '{user_folder}'))
533
        >>> user_folders = cr.ls('user_folder', fullpath=True, duplicates=True, make_crumbs=True)
534
        """
535
        if arg_name not in self._argidx:
536
            raise ValueError("Expected `arg_name` to be one of ({}),"
537
                             " got {}.".format(list(self._argidx), arg_name))
538
539
        # if the first chunk of the path is a parameter, I am not interested in this (for now)
540
        if self._path.startswith(self._arg_start_sym):
541
            raise NotImplementedError("Can't list paths that starts"
542
                                      " with an argument.")
543
544
        if make_crumbs and not fullpath:
545
            raise ValueError("`make_crumbs` can only work if `fullpath` is also True.")
546
547
        arg_deps = self._arg_deps(arg_name)
548
        values_map = None
549
        for arg in arg_deps:
550
            values_map = self._arg_values(arg, values_map)
551
552
        if check_exists:
553
            return self._ls_check_exists(arg_name, values_map=values_map,
554
                                         fullpath=fullpath,
555
                                         duplicates=duplicates,
556
                                         make_crumbs=make_crumbs)
557
        else:
558
            return self._ls_no_check_exists(arg_name, values_map=values_map,
559
                                            fullpath=fullpath,
560
                                            duplicates=duplicates,
561
                                            make_crumbs=make_crumbs)
562
563
    def _ls_no_check_exists(self, arg_name, values_map, fullpath, duplicates, make_crumbs):
564
        if not fullpath:  # this means we can return the list of crumbs directly
565
            values = [dict(val)[arg_name] for val in values_map]
566
        else:  # this means we have to build the full paths
567
            values = [self._replace(**dict(val)) for val in values_map]
568
569
        if not duplicates:
570
            values = remove_duplicates(values)
571
572
        if fullpath and make_crumbs:
573
            values = [self.from_path(val) for val in values]
574
575
        return values
576
577
    def _ls_check_exists(self, arg_name, values_map, fullpath, duplicates, make_crumbs):
578
579
        paths = [self._replace(**dict(val)) for val in values_map]
580
        paths = [self.from_path(val) for val in paths]
581
        paths = [val for val in paths if val.exists()]
582
583
        if not fullpath:
584
            argidx = self._argidx[arg_name]
585
            values = [str(val).split(op.sep)[argidx] for val in paths]
586
        else:
587
            if make_crumbs:
588
                values = paths
589
            else:
590
                values = [str(val) for val in paths]
591
592
        return values
593
594
    def _remaining_deps(self, arg_names):
595
        """ Return the name of the arguments that are dependencies of `arg_names`.
596
        Parameters
597
        ----------
598
        arg_names: Sequence[str]
599
600
        Returns
601
        -------
602
        rem_deps: Sequence[str]
603
        """
604
        started = False
605
        rem_deps = []
606
        for an in reversed(list(self._argidx.keys())):  # take into account that argidx is ordered
607
            if an in arg_names:
608
                started = True
609
            else:
610
                if started:
611
                    rem_deps.append(an)
612
613
        return rem_deps
614
615
    def touch(self):
616
        """ Create a leaf directory and all intermediate ones
617
        using the non crumbed part of `crumb_path`.
618
        If the target directory already exists, raise an IOError
619
        if exist_ok is False. Otherwise no exception is raised.
620
        Parameters
621
        ----------
622
        crumb_path: str
623
624
        exist_ok: bool
625
            Default = True
626
627
        Returns
628
        -------
629
        nupath: str
630
            The new path created.
631
        """
632
        return self._touch(self._path)
633
634
    @classmethod
635
    def _touch(cls, crumb_path, exist_ok=True):
636
        """ Create a leaf directory and all intermediate ones
637
        using the non crumbed part of `crumb_path`.
638
        If the target directory already exists, raise an IOError
639
        if exist_ok is False. Otherwise no exception is raised.
640
        Parameters
641
        ----------
642
        crumb_path: str
643
644
        exist_ok: bool
645
            Default = True
646
647
        Returns
648
        -------
649
        nupath: str
650
            The new path created.
651
        """
652
        if cls.has_crumbs(crumb_path):
653
            nupath = cls._split(crumb_path)[0]
654
        else:
655
            nupath = crumb_path
656
657
        if op.exists(nupath) and not exist_ok:
658
            raise IOError("Folder {} already exists.".format(nupath))
659
660
        try:
661
            os.makedirs(nupath, exist_ok=exist_ok)
662
        except:
663
            raise
664
        else:
665
            return nupath
666
667
    def exists(self):
668
        """ Return True if the current crumb path is a possibly existing path,
669
        False otherwise.
670
        Returns
671
        -------
672
        exists: bool
673
        """
674
        if not op.exists(self.split()[0]):
675
            return False
676
677
        last, _ = self._lastarg()
678
        paths = self.ls(last,
679
                        fullpath     = True,
680
                        make_crumbs  = False,
681
                        duplicates   = True,
682
                        check_exists = False)
683
684
        return all([self._split_exists(lp) for lp in paths])
685
686
    def has_files(self):
687
        """ Return True if the current crumb path has any file in its
688
        possible paths.
689
        Returns
690
        -------
691
        has_files: bool
692
        """
693
        if not op.exists(self.split()[0]):
694
            return False
695
696
        last, _ = self._lastarg()
697
        paths = self.ls(last,
698
                        fullpath     = True,
699
                        make_crumbs  = True,
700
                        duplicates   = True,
701
                        check_exists = True)
702
703
        return any([op.isfile(str(lp)) for lp in paths])
704
705
    def unfold(self):
706
        """ Return a list of all the existing paths until the last crumb argument.
707
        Returns
708
        -------
709
        paths: list of pathlib.Path
710
        """
711
        return self.ls(self._lastarg()[0],
712
                       fullpath    = True,
713
                       duplicates  = False,
714
                       make_crumbs = True,
715
                       check_exists= True)
716
717
    @classmethod
718
    def _split_exists(cls, crumb_path):
719
        """ Return True if the part without crumb arguments of `crumb_path`
720
        is an existing path or a symlink, False otherwise.
721
        Returns
722
        -------
723
        exists: bool
724
        """
725
        if cls.has_crumbs(crumb_path):
726
            rpath = cls._split(crumb_path)[0]
727
        else:
728
            rpath = str(crumb_path)
729
730
        return op.exists(rpath) or op.islink(rpath)
731
732
    def __getitem__(self, item):
733
        return self.ls(item,
734
                       fullpath    = False,
735
                       duplicates  = False,
736
                       make_crumbs = False,
737
                       check_exists= True)
738
739
    def __setitem__(self, key, value):
740
        if key not in self._argidx:
741
            raise KeyError("Expected `arg_name` to be one of ({}),"
742
                           " got {}.".format(list(self._argidx), key))
743
744
        self._path = self._replace(**{key: value})
745
        self._update()
746
747
    def __contains__(self, item):
748
        return item in self._argidx
749
750
    def __repr__(self):
751
        return '{}("{}")'.format(__class__.__name__, self._path)
752
753
    def __str__(self):
754
        return str(self._path)
755
756
757