ProgressiveFetchExtract   F
last analyzed

Complexity

Total Complexity 61

Size/Duplication

Total Lines 226
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 226
rs 3.52
c 1
b 0
f 0
wmc 61

11 Methods

Rating   Name   Duplication   Size   Complexity  
A extract_actions() 0 3 3
A progress_update_cache_axn() 0 2 1
A cache_actions() 0 3 3
A progress_update_extract_axn() 0 2 1
A __hash__() 0 2 1
A __eq__() 0 2 1
A __init__() 0 18 2
A prepare() 0 8 3
F make_actions_for_record() 0 96 18
F execute() 0 34 15
F _execute_actions() 0 50 15

How to fix   Complexity   

Complex Class

Complex classes like ProgressiveFetchExtract often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2012 Anaconda, Inc
3
# SPDX-License-Identifier: BSD-3-Clause
4
from __future__ import absolute_import, division, print_function, unicode_literals
5
6
from errno import EACCES, ENOENT, EPERM
7
from functools import reduce
8
from logging import getLogger
9
from os import listdir
10
from os.path import basename, dirname, join
11
from tarfile import ReadError
12
13
from .path_actions import CacheUrlAction, ExtractPackageAction
14
from .. import CondaError, CondaMultiError, conda_signal_handler
15
from .._vendor.auxlib.collection import first
16
from .._vendor.auxlib.decorators import memoizemethod
17
from ..base.constants import CONDA_TARBALL_EXTENSION, PACKAGE_CACHE_MAGIC_FILE
18
from ..base.context import context
19
from ..common.compat import (JSONDecodeError, iteritems, itervalues, odict, string_types,
20
                             text_type, with_metaclass)
21
from ..common.constants import NULL
22
from ..common.io import ProgressBar, time_recorder
23
from ..common.path import expand, url_to_path
24
from ..common.signals import signal_handler
25
from ..common.url import path_to_url
26
from ..exceptions import NoWritablePkgsDirError, NotWritableError
27
from ..gateways.disk.create import (create_package_cache_directory, extract_tarball,
28
                                    write_as_json_to_file)
29
from ..gateways.disk.delete import rm_rf
30
from ..gateways.disk.read import (compute_md5sum, isdir, isfile, islink, read_index_json,
31
                                  read_index_json_from_tarball, read_repodata_json)
32
from ..gateways.disk.test import file_path_is_writable
33
from ..models.match_spec import MatchSpec
34
from ..models.records import PackageCacheRecord, PackageRecord, PackageRef
35
from ..utils import human_bytes
36
37
try:
38
    from cytoolz.itertoolz import concat, concatv, groupby
39
except ImportError:  # pragma: no cover
40
    from .._vendor.toolz.itertoolz import concat, concatv, groupby  # NOQA
41
42
log = getLogger(__name__)
43
44
45
class PackageCacheType(type):
46
    """
47
    This metaclass does basic caching of PackageCache instance objects.
48
    """
49
50
    def __call__(cls, pkgs_dir):
51
        if isinstance(pkgs_dir, PackageCacheData):
52
            return pkgs_dir
53
        elif pkgs_dir in PackageCacheData._cache_:
54
            return PackageCacheData._cache_[pkgs_dir]
55
        else:
56
            package_cache_instance = super(PackageCacheType, cls).__call__(pkgs_dir)
57
            PackageCacheData._cache_[pkgs_dir] = package_cache_instance
58
            return package_cache_instance
59
60
61
@with_metaclass(PackageCacheType)
62
class PackageCacheData(object):
63
    _cache_ = {}
64
65
    def __init__(self, pkgs_dir):
66
        self.pkgs_dir = pkgs_dir
67
        self.__package_cache_records = None
68
        self.__is_writable = NULL
69
70
        self._urls_data = UrlsData(pkgs_dir)
71
72
    def insert(self, package_cache_record):
73
74
        meta = join(package_cache_record.extracted_package_dir, 'info', 'repodata_record.json')
75
        write_as_json_to_file(meta, PackageRecord.from_objects(package_cache_record))
76
77
        self._package_cache_records[package_cache_record] = package_cache_record
78
79
    def load(self):
80
        self.__package_cache_records = _package_cache_records = {}
81
        self._check_writable()  # called here to create the cache if it doesn't exist
82
        if not isdir(self.pkgs_dir):
83
            # no directory exists, and we didn't have permissions to create it
84
            return
85
86
        for base_name in self._dedupe_pkgs_dir_contents(listdir(self.pkgs_dir)):
87
            full_path = join(self.pkgs_dir, base_name)
88
            if islink(full_path):
89
                continue
90
            elif (isdir(full_path) and isfile(join(full_path, 'info', 'index.json'))
91
                  or isfile(full_path) and full_path.endswith(CONDA_TARBALL_EXTENSION)):
92
                package_cache_record = self._make_single_record(base_name)
93
                if package_cache_record:
94
                    _package_cache_records[package_cache_record] = package_cache_record
95
96
    def reload(self):
97
        self.load()
98
        return self
99
100
    def get(self, package_ref, default=NULL):
101
        assert isinstance(package_ref, PackageRef)
102
        try:
103
            return self._package_cache_records[package_ref]
104
        except KeyError:
105
            if default is not NULL:
106
                return default
107
            else:
108
                raise
109
110
    def remove(self, package_ref, default=NULL):
111
        if default is NULL:
112
            return self._package_cache_records.pop(package_ref)
113
        else:
114
            return self._package_cache_records.pop(package_ref, default)
115
116
    def query(self, package_ref_or_match_spec):
117
        # returns a generator
118
        param = package_ref_or_match_spec
119
        if isinstance(param, string_types):
120
            param = MatchSpec(param)
121
        if isinstance(param, MatchSpec):
122
            return (pcrec for pcrec in itervalues(self._package_cache_records)
123
                    if param.match(pcrec))
124
        else:
125
            assert isinstance(param, PackageRef)
126
            return (pcrec for pcrec in itervalues(self._package_cache_records) if pcrec == param)
127
128
    def iter_records(self):
129
        return iter(self._package_cache_records)
130
131
    @classmethod
132
    def query_all(cls, package_ref_or_match_spec, pkgs_dirs=None):
133
        if pkgs_dirs is None:
134
            pkgs_dirs = context.pkgs_dirs
135
136
        return concat(pcache.query(package_ref_or_match_spec)
137
                      for pcache in cls.all_caches_writable_first(pkgs_dirs))
138
139
    # ##########################################################################################
140
    # these class methods reach across all package cache directories (usually context.pkgs_dirs)
141
    # ##########################################################################################
142
143
    @classmethod
144
    def first_writable(cls, pkgs_dirs=None):
145
        # Calling this method will *create* a package cache directory if one does not already
146
        # exist. Any caller should intend to *use* that directory for *writing*, not just reading.
147
        if pkgs_dirs is None:
148
            pkgs_dirs = context.pkgs_dirs
149
        for pkgs_dir in pkgs_dirs:
150
            package_cache = cls(pkgs_dir)
151
            i_wri = package_cache.is_writable
152
            if i_wri is True:
153
                return package_cache
154
            elif i_wri is None:
155
                # means package cache directory doesn't exist, need to try to create it
156
                created = create_package_cache_directory(package_cache.pkgs_dir)
157
                if created:
158
                    package_cache.__is_writable = True
159
                    return package_cache
160
161
        raise NoWritablePkgsDirError(pkgs_dirs)
162
163
    @classmethod
164
    def writable_caches(cls, pkgs_dirs=None):
165
        if pkgs_dirs is None:
166
            pkgs_dirs = context.pkgs_dirs
167
        writable_caches = tuple(filter(lambda c: c.is_writable,
168
                                       (cls(pd) for pd in pkgs_dirs)))
169
        return writable_caches
170
171
    @classmethod
172
    def read_only_caches(cls, pkgs_dirs=None):
173
        if pkgs_dirs is None:
174
            pkgs_dirs = context.pkgs_dirs
175
        read_only_caches = tuple(filter(lambda c: not c.is_writable,
176
                                        (cls(pd) for pd in pkgs_dirs)))
177
        return read_only_caches
178
179
    @classmethod
180
    def all_caches_writable_first(cls, pkgs_dirs=None):
181
        if pkgs_dirs is None:
182
            pkgs_dirs = context.pkgs_dirs
183
        pc_groups = groupby(
184
            lambda pc: pc.is_writable,
185
            (cls(pd) for pd in pkgs_dirs)
186
        )
187
        return tuple(concatv(pc_groups.get(True, ()), pc_groups.get(False, ())))
188
189
    @classmethod
190
    def get_all_extracted_entries(cls):
191
        package_caches = (cls(pd) for pd in context.pkgs_dirs)
192
        return tuple(pc_entry for pc_entry in concat(map(itervalues, package_caches))
193
                     if pc_entry.is_extracted)
194
195
    @classmethod
196
    def get_entry_to_link(cls, package_ref):
197
        pc_entry = next((pcrec for pcrec in cls.query_all(package_ref)
198
                         if pcrec.is_extracted),
199
                        None)
200
        if pc_entry is not None:
201
            return pc_entry
202
203
        # this can happen with `conda install path/to/package.tar.bz2`
204
        #   because dist has channel '<unknown>'
205
        # if ProgressiveFetchExtract did its job correctly, what we're looking for
206
        #   should be the matching dist_name in the first writable package cache
207
        # we'll search all caches for a match, but search writable caches first
208
        dist_str = package_ref.dist_str().rsplit(':', 1)[-1]
209
        pc_entry = next((cache._scan_for_dist_no_channel(dist_str)
210
                         for cache in cls.all_caches_writable_first() if cache), None)
211
        if pc_entry is not None:
212
            return pc_entry
213
        raise CondaError("No package '%s' found in cache directories." % package_ref.dist_str())
214
215
    @classmethod
216
    def tarball_file_in_cache(cls, tarball_path, md5sum=None, exclude_caches=()):
217
        tarball_full_path, md5sum = cls._clean_tarball_path_and_get_md5sum(tarball_path, md5sum)
218
        pc_entry = first(cls(pkgs_dir).tarball_file_in_this_cache(tarball_full_path,
219
                                                                  md5sum)
220
                         for pkgs_dir in context.pkgs_dirs
221
                         if pkgs_dir not in exclude_caches)
222
        return pc_entry
223
224
    @classmethod
225
    def clear(cls):
226
        cls._cache_.clear()
227
228
    def tarball_file_in_this_cache(self, tarball_path, md5sum=None):
229
        tarball_full_path, md5sum = self._clean_tarball_path_and_get_md5sum(tarball_path,
230
                                                                            md5sum=md5sum)
231
        tarball_basename = basename(tarball_full_path)
232
        pc_entry = first((pc_entry for pc_entry in itervalues(self)),
233
                         key=lambda pce: pce.tarball_basename == tarball_basename
234
                                         and pce.md5 == md5sum)  # NOQA
235
        return pc_entry
236
237
    @property
238
    def _package_cache_records(self):
239
        # don't actually populate _package_cache_records until we need it
240
        if self.__package_cache_records is None:
241
            self.load()
242
        return self.__package_cache_records
243
244
    @property
245
    def is_writable(self):
246
        # returns None if package cache directory does not exist / has not been created
247
        if self.__is_writable is NULL:
248
            return self._check_writable()
249
        return self.__is_writable
250
251
    def _check_writable(self):
252
        magic_file = join(self.pkgs_dir, PACKAGE_CACHE_MAGIC_FILE)
253
        if isfile(magic_file):
254
            i_wri = file_path_is_writable(join(self.pkgs_dir, PACKAGE_CACHE_MAGIC_FILE))
255
            self.__is_writable = i_wri
256
            log.debug("package cache directory '%s' writable: %s", self.pkgs_dir, i_wri)
257
        else:
258
            log.trace("package cache directory '%s' does not exist", self.pkgs_dir)
259
            self.__is_writable = i_wri = None
260
        return i_wri
261
262
    @staticmethod
263
    def _clean_tarball_path_and_get_md5sum(tarball_path, md5sum=None):
264
        if tarball_path.startswith('file:/'):
265
            tarball_path = url_to_path(tarball_path)
266
        tarball_full_path = expand(tarball_path)
267
268
        if isfile(tarball_full_path) and md5sum is None:
269
            md5sum = compute_md5sum(tarball_full_path)
270
271
        return tarball_full_path, md5sum
272
273
    def _scan_for_dist_no_channel(self, dist_str):
274
        return next((pcrec for pcrec in self._package_cache_records
275
                     if pcrec.dist_str().rsplit(':', 1)[-1] == dist_str),
276
                    None)
277
278
    def itervalues(self):
279
        return iter(self.values())
280
281
    def values(self):
282
        return self._package_cache_records.values()
283
284
    def __repr__(self):
285
        args = ('%s=%r' % (key, getattr(self, key)) for key in ('pkgs_dir',))
286
        return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
287
288
    def _make_single_record(self, package_filename):
289
        if not package_filename.endswith(CONDA_TARBALL_EXTENSION):
290
            package_filename += CONDA_TARBALL_EXTENSION
291
292
        package_tarball_full_path = join(self.pkgs_dir, package_filename)
293
        log.trace("adding to package cache %s", package_tarball_full_path)
294
        extracted_package_dir = package_tarball_full_path[:-len(CONDA_TARBALL_EXTENSION)]
295
296
        # try reading info/repodata_record.json
297
        try:
298
            repodata_record = read_repodata_json(extracted_package_dir)
299
            package_cache_record = PackageCacheRecord.from_objects(
300
                repodata_record,
301
                package_tarball_full_path=package_tarball_full_path,
302
                extracted_package_dir=extracted_package_dir,
303
            )
304
            return package_cache_record
305
        except (IOError, OSError, JSONDecodeError) as e:
306
            # IOError / OSError if info/repodata_record.json doesn't exists
307
            # JsonDecodeError if info/repodata_record.json is partially extracted or corrupted
308
            #   python 2.7 raises ValueError instead of JsonDecodeError
309
            #   ValueError("No JSON object could be decoded")
310
            log.debug("unable to read %s\n  because %r",
311
                      join(extracted_package_dir, 'info', 'repodata_record.json'), e)
312
313
            # try reading info/index.json
314
            try:
315
                index_json_record = read_index_json(extracted_package_dir)
316
            except (IOError, OSError, JSONDecodeError) as e:
317
                # IOError / OSError if info/index.json doesn't exist
318
                # JsonDecodeError if info/index.json is partially extracted or corrupted
319
                #   python 2.7 raises ValueError instead of JsonDecodeError
320
                #   ValueError("No JSON object could be decoded")
321
                log.debug("unable to read %s\n  because",
322
                          join(extracted_package_dir, 'info', 'index.json'), e)
323
324
                if isdir(extracted_package_dir) and not isfile(package_tarball_full_path):
325
                    # We have a directory that looks like a conda package, but without
326
                    # (1) info/repodata_record.json or info/index.json, and (2) a conda package
327
                    # tarball, there's not much we can do.  We'll just ignore it.
328
                    return None
329
330
                try:
331
                    if self.is_writable:
332
                        if isdir(extracted_package_dir):
333
                            # We have a partially unpacked conda package directory. Best thing
334
                            # to do is remove it and try extracting.
335
                            rm_rf(extracted_package_dir)
336
                        try:
337
                            extract_tarball(package_tarball_full_path, extracted_package_dir)
338
                        except EnvironmentError as e:
339
                            if e.errno == ENOENT:
340
                                # FileNotFoundError(2, 'No such file or directory')
341
                                # At this point, we can assume the package tarball is bad.
342
                                # Remove everything and move on.
343
                                # see https://github.com/conda/conda/issues/6707
344
                                rm_rf(package_tarball_full_path)
345
                                rm_rf(extracted_package_dir)
346
                        try:
347
                            index_json_record = read_index_json(extracted_package_dir)
348
                        except (IOError, OSError, JSONDecodeError):
349
                            # At this point, we can assume the package tarball is bad.
350
                            # Remove everything and move on.
351
                            rm_rf(package_tarball_full_path)
352
                            rm_rf(extracted_package_dir)
353
                            return None
354
                    else:
355
                        index_json_record = read_index_json_from_tarball(package_tarball_full_path)
356
                except (EOFError, ReadError) as e:
357
                    # EOFError: Compressed file ended before the end-of-stream marker was reached
358
                    # tarfile.ReadError: file could not be opened successfully
359
                    # We have a corrupted tarball. Remove the tarball so it doesn't affect
360
                    # anything, and move on.
361
                    log.debug("unable to extract info/index.json from %s\n  because %r",
362
                              package_tarball_full_path, e)
363
                    rm_rf(package_tarball_full_path)
364
                    return None
365
366
            # we were able to read info/index.json, so let's continue
367
            if isfile(package_tarball_full_path):
368
                md5 = compute_md5sum(package_tarball_full_path)
369
            else:
370
                md5 = None
371
372
            url = self._urls_data.get_url(package_filename)
373
            package_cache_record = PackageCacheRecord.from_objects(
374
                index_json_record,
375
                url=url,
376
                md5=md5,
377
                package_tarball_full_path=package_tarball_full_path,
378
                extracted_package_dir=extracted_package_dir,
379
            )
380
381
            # write the info/repodata_record.json file so we can short-circuit this next time
382
            if self.is_writable:
383
                repodata_record = PackageRecord.from_objects(package_cache_record)
384
                repodata_record_path = join(extracted_package_dir, 'info', 'repodata_record.json')
385
                try:
386
                    write_as_json_to_file(repodata_record_path, repodata_record)
387
                except (IOError, OSError) as e:
388
                    if e.errno in (EACCES, EPERM) and isdir(dirname(repodata_record_path)):
389
                        raise NotWritableError(repodata_record_path, e.errno, caused_by=e)
390
                    else:
391
                        raise
392
393
            return package_cache_record
394
395
    @staticmethod
396
    def _dedupe_pkgs_dir_contents(pkgs_dir_contents):
397
        # if both 'six-1.10.0-py35_0/' and 'six-1.10.0-py35_0.tar.bz2' are in pkgs_dir,
398
        #   only 'six-1.10.0-py35_0.tar.bz2' will be in the return contents
399
        if not pkgs_dir_contents:
400
            return []
401
402
        contents = []
403
404
        def _process(x, y):
405
            if x + CONDA_TARBALL_EXTENSION != y:
406
                contents.append(x)
407
            return y
408
409
        last = reduce(_process, sorted(pkgs_dir_contents))
410
        _process(last, contents and contents[-1] or '')
411
        return contents
412
413
414
class UrlsData(object):
415
    # this is a class to manage urls.txt
416
    # it should basically be thought of as a sequence
417
    # in this class I'm breaking the rule that all disk access goes through conda.gateways
418
419
    def __init__(self, pkgs_dir):
420
        self.pkgs_dir = pkgs_dir
421
        self.urls_txt_path = urls_txt_path = join(pkgs_dir, 'urls.txt')
422
        if isfile(urls_txt_path):
423
            with open(urls_txt_path, 'r') as fh:
424
                self._urls_data = [line.strip() for line in fh]
425
                self._urls_data.reverse()
426
        else:
427
            self._urls_data = []
428
429
    def __contains__(self, url):
430
        return url in self._urls_data
431
432
    def __iter__(self):
433
        return iter(self._urls_data)
434
435
    def add_url(self, url):
436
        with open(self.urls_txt_path, 'a') as fh:
437
            fh.write(url + '\n')
438
        self._urls_data.insert(0, url)
439
440
    @memoizemethod
441
    def get_url(self, package_path):
442
        # package path can be a full path or just a basename
443
        #   can be either an extracted directory or tarball
444
        package_path = basename(package_path)
445
        if not package_path.endswith(CONDA_TARBALL_EXTENSION):
446
            package_path += CONDA_TARBALL_EXTENSION
447
        return first(self, lambda url: basename(url) == package_path)
448
449
450
# ##############################
451
# downloading
452
# ##############################
453
454
class ProgressiveFetchExtract(object):
455
456
    @staticmethod
457
    def make_actions_for_record(pref_or_spec):
458
        assert pref_or_spec is not None
459
        # returns a cache_action and extract_action
460
461
        # if the pref or spec has an md5 value
462
        # look in all caches for package cache record that is
463
        #   (1) already extracted, and
464
        #   (2) matches the md5
465
        # If one exists, no actions are needed.
466
        md5 = pref_or_spec.get('md5')
467
        if md5:
468
            extracted_pcrec = next((
469
                pcrec for pcrec in concat(PackageCacheData(pkgs_dir).query(pref_or_spec)
470
                                          for pkgs_dir in context.pkgs_dirs)
471
                if pcrec.is_extracted
472
            ), None)
473
            if extracted_pcrec:
474
                return None, None
475
476
        # there is no extracted dist that can work, so now we look for tarballs that
477
        #   aren't extracted
478
        # first we look in all writable caches, and if we find a match, we extract in place
479
        # otherwise, if we find a match in a non-writable cache, we link it to the first writable
480
        #   cache, and then extract
481
        first_writable_cache = PackageCacheData.first_writable()
482
        pcrec_from_writable_cache = next((
483
            pcrec for pcrec in concat(pcache.query(pref_or_spec)
484
                                      for pcache in PackageCacheData.writable_caches())
485
            if pcrec.is_fetched
486
        ), None)
487
        if pcrec_from_writable_cache:
488
            # extract in place
489
            extract_axn = ExtractPackageAction(
490
                source_full_path=pcrec_from_writable_cache.package_tarball_full_path,
491
                target_pkgs_dir=dirname(pcrec_from_writable_cache.package_tarball_full_path),
492
                target_extracted_dirname=basename(pcrec_from_writable_cache.extracted_package_dir),
493
                record_or_spec=pcrec_from_writable_cache,
494
                md5sum=pcrec_from_writable_cache.md5,
495
            )
496
            return None, extract_axn
497
498
        pcrec_from_read_only_cache = next((
499
            pcrec for pcrec in concat(pcache.query(pref_or_spec)
500
                                      for pcache in PackageCacheData.read_only_caches())
501
            if pcrec.is_fetched
502
        ), None)
503
504
        if pcrec_from_read_only_cache:
505
            # we found a tarball, but it's in a read-only package cache
506
            # we need to link the tarball into the first writable package cache,
507
            #   and then extract
508
            try:
509
                expected_size_in_bytes = pref_or_spec.size
510
            except AttributeError:
511
                expected_size_in_bytes = None
512
            cache_axn = CacheUrlAction(
513
                url=path_to_url(pcrec_from_read_only_cache.package_tarball_full_path),
514
                target_pkgs_dir=first_writable_cache.pkgs_dir,
515
                target_package_basename=pcrec_from_read_only_cache.fn,
516
                md5sum=md5,
517
                expected_size_in_bytes=expected_size_in_bytes,
518
            )
519
            trgt_extracted_dirname = pcrec_from_read_only_cache.fn[:-len(CONDA_TARBALL_EXTENSION)]
520
            extract_axn = ExtractPackageAction(
521
                source_full_path=cache_axn.target_full_path,
522
                target_pkgs_dir=first_writable_cache.pkgs_dir,
523
                target_extracted_dirname=trgt_extracted_dirname,
524
                record_or_spec=pcrec_from_read_only_cache,
525
                md5sum=pcrec_from_read_only_cache.md5,
526
            )
527
            return cache_axn, extract_axn
528
529
        # if we got here, we couldn't find a matching package in the caches
530
        #   we'll have to download one; fetch and extract
531
        url = pref_or_spec.get('url')
532
        assert url
533
        try:
534
            expected_size_in_bytes = pref_or_spec.size
535
        except AttributeError:
536
            expected_size_in_bytes = None
537
        cache_axn = CacheUrlAction(
538
            url=url,
539
            target_pkgs_dir=first_writable_cache.pkgs_dir,
540
            target_package_basename=pref_or_spec.fn,
541
            md5sum=md5,
542
            expected_size_in_bytes=expected_size_in_bytes,
543
        )
544
        extract_axn = ExtractPackageAction(
545
            source_full_path=cache_axn.target_full_path,
546
            target_pkgs_dir=first_writable_cache.pkgs_dir,
547
            target_extracted_dirname=pref_or_spec.fn[:-len(CONDA_TARBALL_EXTENSION)],
548
            record_or_spec=pref_or_spec,
549
            md5sum=md5,
550
        )
551
        return cache_axn, extract_axn
552
553
    def __init__(self, link_prefs):
554
        """
555
        Args:
556
            link_prefs (Tuple[PackageRef]):
557
                A sequence of :class:`PackageRef`s to ensure available in a known
558
                package cache, typically for a follow-on :class:`UnlinkLinkTransaction`.
559
                Here, "available" means the package tarball is both downloaded and extracted
560
                to a package directory.
561
        """
562
        self.link_precs = link_prefs
563
564
        log.debug("instantiating ProgressiveFetchExtract with\n"
565
                  "  %s\n", '\n  '.join(pkg_rec.dist_str() for pkg_rec in link_prefs))
566
567
        self.paired_actions = odict()  # Map[pref, Tuple(CacheUrlAction, ExtractPackageAction)]
568
569
        self._prepared = False
570
        self._executed = False
571
572
    @time_recorder("fetch_extract_prepare")
573
    def prepare(self):
574
        if self._prepared:
575
            return
576
577
        self.paired_actions.update((prec, self.make_actions_for_record(prec))
578
                                   for prec in self.link_precs)
579
        self._prepared = True
580
581
    @property
582
    def cache_actions(self):
583
        return tuple(axns[0] for axns in itervalues(self.paired_actions) if axns[0])
584
585
    @property
586
    def extract_actions(self):
587
        return tuple(axns[1] for axns in itervalues(self.paired_actions) if axns[1])
588
589
    def execute(self):
590
        if self._executed:
591
            return
592
        if not self._prepared:
593
            self.prepare()
594
595
        assert not context.dry_run
596
597
        if not self.cache_actions or not self.extract_actions:
598
            return
599
600
        if not context.verbosity and not context.quiet and not context.json:
601
            # TODO: use logger
602
            print("\nDownloading and Extracting Packages")
603
        else:
604
            log.debug("prepared package cache actions:\n"
605
                      "  cache_actions:\n"
606
                      "    %s\n"
607
                      "  extract_actions:\n"
608
                      "    %s\n",
609
                      '\n    '.join(text_type(ca) for ca in self.cache_actions),
610
                      '\n    '.join(text_type(ea) for ea in self.extract_actions))
611
612
        exceptions = []
613
        with signal_handler(conda_signal_handler), time_recorder("fetch_extract_execute"):
614
            for prec_or_spec, prec_actions in iteritems(self.paired_actions):
615
                exc = self._execute_actions(prec_or_spec, prec_actions)
616
                if exc:
617
                    log.debug('%r', exc, exc_info=True)
618
                    exceptions.append(exc)
619
620
        if exceptions:
621
            raise CondaMultiError(exceptions)
622
        self._executed = True
623
624
    @staticmethod
625
    def _execute_actions(prec_or_spec, actions):
626
        cache_axn, extract_axn = actions
627
        if cache_axn is None and extract_axn is None:
628
            return
629
630
        desc = "%s-%s" % (prec_or_spec.name, prec_or_spec.version)
631
        if len(desc) > 20:
632
            desc = desc[:20]
633
        size = getattr(prec_or_spec, 'size', None)
634
        desc = "%-20s | %8s | " % (desc, size and human_bytes(size) or '')
635
636
        progress_bar = ProgressBar(desc, not context.verbosity and not context.quiet, context.json)
637
638
        download_total = 0.75  # fraction of progress for download; the rest goes to extract
639
        try:
640
            if cache_axn:
641
                cache_axn.verify()
642
643
                if not cache_axn.url.startswith('file:/'):
644
                    def progress_update_cache_axn(pct_completed):
645
                        progress_bar.update_to(pct_completed * download_total)
646
                else:
647
                    download_total = 0
648
                    progress_update_cache_axn = None
649
650
                cache_axn.execute(progress_update_cache_axn)
651
652
            if extract_axn:
653
                extract_axn.verify()
654
655
                def progress_update_extract_axn(pct_completed):
656
                    progress_bar.update_to((1 - download_total) * pct_completed + download_total)
657
658
                extract_axn.execute(progress_update_extract_axn)
659
660
        except Exception as e:
661
            if extract_axn:
662
                extract_axn.reverse()
663
            if cache_axn:
664
                cache_axn.reverse()
665
            return e
666
        else:
667
            if cache_axn:
668
                cache_axn.cleanup()
669
            if extract_axn:
670
                extract_axn.cleanup()
671
            progress_bar.finish()
672
        finally:
673
            progress_bar.close()
674
675
    def __hash__(self):
676
        return hash(self.link_precs)
677
678
    def __eq__(self, other):
679
        return hash(self) == hash(other)
680
681
682
# ##############################
683
# backward compatibility
684
# ##############################
685
686
def rm_fetched(dist):
687
    """
688
    Checks to see if the requested package is in the cache; and if so, it removes both
689
    the package itself and its extracted contents.
690
    """
691
    # in conda/exports.py and conda_build/conda_interface.py, but not actually
692
    #   used in conda-build
693
    raise NotImplementedError()
694
695
696
def download(url, dst_path, session=None, md5=None, urlstxt=False, retries=3):
697
    from ..gateways.connection.download import download as gateway_download
698
    gateway_download(url, dst_path, md5)
699