Completed
Push — master ( 6cfd3b...95ab6f )
by Kale
64:30
created

test_simple_LinkPathAction_softlink()   F

Complexity

Conditions 9

Size

Total Lines 31

Duplication

Lines 31
Ratio 100 %

Importance

Changes 0
Metric Value
cc 9
c 0
b 0
f 0
dl 31
loc 31
rs 3
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import, division, print_function, unicode_literals
3
4
from logging import getLogger
5
from os.path import basename, dirname, isdir, isfile, join, lexists, getsize
6
from shlex import split as shlex_split
7
from subprocess import check_output
8
import sys
9
from tempfile import gettempdir
10
from unittest import TestCase
11
from uuid import uuid4
12
13
import pytest
14
15
from conda._vendor.auxlib.collection import AttrDict
16
from conda._vendor.toolz.itertoolz import groupby
17
from conda.base.constants import PREFIX_MAGIC_FILE
18
from conda.base.context import context, reset_context
19
from conda.common.compat import PY2, on_win
20
from conda.common.io import env_var
21
from conda.common.path import get_bin_directory_short_path, get_python_noarch_target_path, \
22
    get_python_short_path, get_python_site_packages_short_path, parse_entry_point_def, pyc_path, \
23
    win_path_ok
24
from conda.core.path_actions import CompilePycAction, CreatePythonEntryPointAction, LinkPathAction
25
from conda.exceptions import ParseError
26
from conda.gateways.disk.create import create_link, mkdir_p
27
from conda.gateways.disk.delete import rm_rf
28
from conda.gateways.disk.link import islink, stat_nlink
29
from conda.gateways.disk.permissions import is_executable
30
from conda.gateways.disk.read import compute_md5sum, compute_sha256sum
31
from conda.gateways.disk.test import softlink_supported
32
from conda.gateways.disk.update import touch
33
from conda.models.enums import LinkType, NoarchType, PathType
34
from conda.models.records import PathDataV1
35
36
try:
37
    from unittest.mock import Mock, patch
38
except ImportError:
39
    from mock import Mock, patch
40
41
log = getLogger(__name__)
42
43
44
def make_test_file(target_dir, suffix='', contents=''):
45
    if not isdir(target_dir):
46
        mkdir_p(target_dir)
47
    fn = str(uuid4())[:8]
48
    full_path = join(target_dir, fn + suffix)
49
    with open(full_path, 'w') as fh:
50
        fh.write(contents or str(uuid4()))
51
    return full_path
52
53
54
def load_python_file(py_file_full_path):
55
    if PY2:
56
        import imp
57
        return imp.load_compiled("module.name", py_file_full_path)
58
    elif sys.version_info < (3, 5):
59
        raise ParseError("this doesn't work for .pyc files")
60
        from importlib.machinery import SourceFileLoader
61
        return SourceFileLoader("module.name", py_file_full_path).load_module()
62
    else:
63
        import importlib.util
64
        spec = importlib.util.spec_from_file_location("module.name", py_file_full_path)
65
        module = importlib.util.module_from_spec(spec)
66
        spec.loader.exec_module(module)
67
        return module
68
69
70
class PathActionsTests(TestCase):
71
    def setUp(self):
72
        tempdirdir = gettempdir()
73
74
        prefix_dirname = str(uuid4())[:4] + ' ' + str(uuid4())[:4] + '-prefix'
75
        self.prefix = join(tempdirdir, prefix_dirname)
76
        mkdir_p(self.prefix)
77
        assert isdir(self.prefix)
78
79
        pkgs_dirname = str(uuid4())[:4] + ' ' + str(uuid4())[:4]
80
        self.pkgs_dir = join(tempdirdir, pkgs_dirname)
81
        mkdir_p(self.pkgs_dir)
82
        assert isdir(self.pkgs_dir)
83
84
    def tearDown(self):
85
        rm_rf(self.prefix)
86
        if not (on_win and PY2):
87
            # this assertion fails for the Softlink action windows tests
88
            # line 141 in backoff_rmdir
89
            #  exp_backoff_fn(rmtree, path, onerror=retry, max_tries=max_tries)
90
            # leaves a directory self.prefix\\Scripts that cannot be accessed or removed
91
            assert not lexists(self.prefix)
92
        rm_rf(self.pkgs_dir)
93
        assert not lexists(self.pkgs_dir)
94
95
    def test_CompilePycAction_generic(self):
96
        package_info = AttrDict(
97
            package_metadata=AttrDict(
98
                noarch=AttrDict(
99
                    type=NoarchType.generic))
100
        )
101
        noarch = package_info.package_metadata and package_info.package_metadata.noarch
102
        assert noarch.type == NoarchType.generic
103
        axns = CompilePycAction.create_actions({}, package_info, self.prefix, None, ())
104
        assert axns == ()
105
106
        package_info = AttrDict(package_metadata=None)
107
        axns = CompilePycAction.create_actions({}, package_info, self.prefix, None, ())
108
        assert axns == ()
109
110
    def test_CompilePycAction_noarch_python(self):
111
        if not softlink_supported(__file__, self.prefix) and on_win:
112
            pytest.skip("softlink not supported")
113
114
        target_python_version = '%d.%d' % sys.version_info[:2]
115
        sp_dir = get_python_site_packages_short_path(target_python_version)
116
        transaction_context = {
117
            'target_python_version': target_python_version,
118
            'target_site_packages_short_path': sp_dir,
119
        }
120
        package_info = AttrDict(package_metadata=AttrDict(noarch=AttrDict(type=NoarchType.python)))
121
122
        file_link_actions = [
123
            AttrDict(
124
                source_short_path='site-packages/something.py',
125
                target_short_path=get_python_noarch_target_path('site-packages/something.py', sp_dir),
126
            ),
127
            AttrDict(
128
                # this one shouldn't get compiled
129
                source_short_path='something.py',
130
                target_short_path=get_python_noarch_target_path('something.py', sp_dir),
131
            ),
132
        ]
133
        axns = CompilePycAction.create_actions(transaction_context, package_info, self.prefix,
134
                                               None, file_link_actions)
135
136
        assert len(axns) == 1
137
        axn = axns[0]
138
        assert axn.source_full_path == join(self.prefix, win_path_ok(get_python_noarch_target_path('site-packages/something.py', sp_dir)))
139
        assert axn.target_full_path == join(self.prefix, win_path_ok(pyc_path(get_python_noarch_target_path('site-packages/something.py', sp_dir),
140
                     target_python_version)))
141
142
        # make .py file in prefix that will be compiled
143
        mkdir_p(dirname(axn.source_full_path))
144
        with open(axn.source_full_path, 'w') as fh:
145
            fh.write("value = 42\n")
146
147
        # symlink the current python
148
        python_full_path = join(self.prefix, get_python_short_path(target_python_version))
149
        mkdir_p(dirname(python_full_path))
150
        create_link(sys.executable, python_full_path, LinkType.softlink)
151
152
        axn.execute()
153
        assert isfile(axn.target_full_path)
154
155
        # remove the source .py file so we're sure we're importing the pyc file below
156
        rm_rf(axn.source_full_path)
157
        assert not isfile(axn.source_full_path)
158
159
        if (3,) > sys.version_info >= (3, 5):
160
            # we're probably dropping py34 support soon enough anyway
161
            imported_pyc_file = load_python_file(axn.target_full_path)
162
            assert imported_pyc_file.value == 42
163
164
        axn.reverse()
165
        assert not isfile(axn.target_full_path)
166
167
    def test_CreatePythonEntryPointAction_generic(self):
168
        package_info = AttrDict(package_metadata=None)
169
        axns = CreatePythonEntryPointAction.create_actions({}, package_info, self.prefix, None)
170
        assert axns == ()
171
172
    def test_CreatePythonEntryPointAction_noarch_python(self):
173
        target_python_version = '%d.%d' % sys.version_info[:2]
174
        transaction_context = {
175
            'target_python_version': target_python_version,
176
        }
177
        package_info = AttrDict(package_metadata=AttrDict(noarch=AttrDict(
178
            type=NoarchType.python,
179
            entry_points=(
180
                'command1=some.module:main',
181
                'command2=another.somewhere:go',
182
            ),
183
        )))
184
185
        axns = CreatePythonEntryPointAction.create_actions(transaction_context, package_info,
186
                                                           self.prefix, LinkType.hardlink)
187
        grouped_axns = groupby(lambda ax: isinstance(ax, LinkPathAction), axns)
188
        windows_exe_axns = grouped_axns.get(True, ())
189
        assert len(windows_exe_axns) == (2 if on_win else 0)
190
        py_ep_axns = grouped_axns.get(False, ())
191
        assert len(py_ep_axns) == 2
192
193
        py_ep_axn = py_ep_axns[0]
194
195
        command, module, func = parse_entry_point_def('command1=some.module:main')
196
        assert command == 'command1'
197
        if on_win:
198
            target_short_path = "%s\\%s-script.py" % (get_bin_directory_short_path(), command)
199
        else:
200
            target_short_path = "%s/%s" % (get_bin_directory_short_path(), command)
201
        assert py_ep_axn.target_full_path == join(self.prefix, target_short_path)
202
        assert py_ep_axn.module == module == 'some.module'
203
        assert py_ep_axn.func == func == 'main'
204
205
        mkdir_p(dirname(py_ep_axn.target_full_path))
206
        py_ep_axn.execute()
207
        assert isfile(py_ep_axn.target_full_path)
208
        if not on_win:
209
            assert is_executable(py_ep_axn.target_full_path)
210
        with open(py_ep_axn.target_full_path) as fh:
211
            lines = fh.readlines()
212
            first_line = lines[0].strip()
213
            last_line = lines[-1].strip()
214
        if not on_win:
215
            python_full_path = join(self.prefix, get_python_short_path(target_python_version))
216
            assert first_line == "#!%s" % python_full_path
217
        assert last_line == "sys.exit(%s())" % func
218
219
        py_ep_axn.reverse()
220
        assert not isfile(py_ep_axn.target_full_path)
221
222
        if on_win:
223
            windows_exe_axn = windows_exe_axns[0]
224
            target_short_path = "%s\\%s.exe" % (get_bin_directory_short_path(), command)
225
            assert windows_exe_axn.target_full_path == join(self.prefix, target_short_path)
226
227
            mkdir_p(dirname(windows_exe_axn.target_full_path))
228
            windows_exe_axn.verify()
229
            windows_exe_axn.execute()
230
            assert isfile(windows_exe_axn.target_full_path)
231
            assert is_executable(windows_exe_axn.target_full_path)
232
233
            src = compute_md5sum(join(context.conda_prefix, 'Scripts/conda.exe'))
234
            assert src == compute_md5sum(windows_exe_axn.target_full_path)
235
236
            windows_exe_axn.reverse()
237
            assert not isfile(windows_exe_axn.target_full_path)
238
239 View Code Duplication
    def test_simple_LinkPathAction_hardlink(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
240
        source_full_path = make_test_file(self.pkgs_dir)
241
        target_short_path = source_short_path = basename(source_full_path)
242
243
        correct_sha256 = compute_sha256sum(source_full_path)
244
        correct_size_in_bytes = getsize(source_full_path)
245
        path_type = PathType.hardlink
246
247
        source_path_data = PathDataV1(
248
            _path = source_short_path,
249
            path_type=path_type,
250
            sha256=correct_sha256,
251
            size_in_bytes=correct_size_in_bytes,
252
        )
253
254
        axn = LinkPathAction({}, None, self.pkgs_dir, source_short_path, self.prefix,
255
                             target_short_path, LinkType.hardlink, source_path_data)
256
257
        assert axn.target_full_path == join(self.prefix, target_short_path)
258
        axn.verify()
259
        axn.execute()
260
        assert isfile(axn.target_full_path)
261
        assert not islink(axn.target_full_path)
262
        assert stat_nlink(axn.target_full_path) == 2
263
264
        axn.reverse()
265
        assert not lexists(axn.target_full_path)
266
267 View Code Duplication
    def test_simple_LinkPathAction_softlink(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
268
        if not softlink_supported(__file__, self.prefix) and on_win:
269
            pytest.skip("softlink not supported")
270
271
        source_full_path = make_test_file(self.pkgs_dir)
272
        target_short_path = source_short_path = basename(source_full_path)
273
274
        correct_sha256 = compute_sha256sum(source_full_path)
275
        correct_size_in_bytes = getsize(source_full_path)
276
        path_type = PathType.hardlink
277
278
        source_path_data = PathDataV1(
279
            _path = source_short_path,
280
            path_type=path_type,
281
            sha256=correct_sha256,
282
            size_in_bytes=correct_size_in_bytes,
283
        )
284
285
        axn = LinkPathAction({}, None, self.pkgs_dir, source_short_path, self.prefix,
286
                             target_short_path, LinkType.softlink, source_path_data)
287
288
        assert axn.target_full_path == join(self.prefix, target_short_path)
289
        axn.verify()
290
        axn.execute()
291
        assert isfile(axn.target_full_path)
292
        assert islink(axn.target_full_path)
293
        assert stat_nlink(axn.target_full_path) == 1
294
295
        axn.reverse()
296
        assert not lexists(axn.target_full_path)
297
        assert lexists(source_full_path)
298
299
    def test_simple_LinkPathAction_directory(self):
300
        target_short_path = join('a', 'nested', 'directory')
301
        axn = LinkPathAction({}, None, None, None, self.prefix,
302
                             target_short_path, LinkType.directory, None)
303
        axn.verify()
304
        axn.execute()
305
306
        assert isdir(join(self.prefix, target_short_path))
307
308
        axn.reverse()
309
        assert not lexists(axn.target_full_path)
310
        assert not lexists(dirname(axn.target_full_path))
311
        assert not lexists(dirname(dirname(axn.target_full_path)))
312
313 View Code Duplication
    def test_simple_LinkPathAction_copy(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
314
        source_full_path = make_test_file(self.pkgs_dir)
315
        target_short_path = source_short_path = basename(source_full_path)
316
317
        correct_sha256 = compute_sha256sum(source_full_path)
318
        correct_size_in_bytes = getsize(source_full_path)
319
        path_type = PathType.hardlink
320
321
        source_path_data = PathDataV1(
322
            _path = source_short_path,
323
            path_type=path_type,
324
            sha256=correct_sha256,
325
            size_in_bytes=correct_size_in_bytes,
326
        )
327
328
        axn = LinkPathAction({}, None, self.pkgs_dir, source_short_path, self.prefix,
329
                             target_short_path, LinkType.copy, source_path_data)
330
331
        assert axn.target_full_path == join(self.prefix, target_short_path)
332
        axn.verify()
333
        axn.execute()
334
        assert isfile(axn.target_full_path)
335
        assert not islink(axn.target_full_path)
336
        assert stat_nlink(axn.target_full_path) == 1
337
338
        axn.reverse()
339
        assert not lexists(axn.target_full_path)
340
341
    # @pytest.mark.skipif(on_win, reason="unix-only test")
342
    # def test_CreateApplicationSoftlinkAction_basic_symlink_unix(self):
343
    #     from conda.core.path_actions import CreateApplicationSoftlinkAction
344
    #
345
    #     source_prefix = join(self.prefix, 'envs', '_yellow_')
346
    #     test_file = make_test_file(join(source_prefix, 'bin'), suffix='', contents='echo yellow')
347
    #     test_file = test_file[len(source_prefix) + 1:]
348
    #
349
    #     assert check_output(
350
    #         shlex_split("sh -c '. \"%s\"'" % join(source_prefix, test_file))).strip() == b"yellow"
351
    #
352
    #     package_info = AttrDict(
353
    #         index_json_record=AttrDict(name="yellow_package"),
354
    #         repodata_record=AttrDict(preferred_env="yellow"),
355
    #         package_metadata=AttrDict(
356
    #             preferred_env=AttrDict(
357
    #                 softlink_paths=[
358
    #                     test_file,
359
    #                 ]
360
    #             )
361
    #         ),
362
    #     )
363
    #     target_full_path = join(self.prefix, test_file)
364
    #     mkdir_p(join(dirname(target_full_path)))
365
    #
366
    #     with env_var("CONDA_ROOT_PREFIX", self.prefix, reset_context):
367
    #         axns = CreateApplicationSoftlinkAction.create_actions({}, package_info, source_prefix,
368
    #                                                               None)
369
    #         assert len(axns) == 1
370
    #         axn = axns[0]
371
    #
372
    #         assert axn.target_full_path == target_full_path
373
    #         axn.verify()
374
    #         axn.execute()
375
    #         assert islink(axn.target_full_path)
376
    #         assert check_output(shlex_split("sh -c '. \"%s\"'" % axn.target_full_path)).strip() == b"yellow"
377
    #         axn.reverse()
378
    #         assert not lexists(axn.target_full_path)
379
380
    # @pytest.mark.skipif(not on_win, reason="windows-only test")
381
    # def test_CreateApplicationSoftlinkAction_basic_symlink_windows_not_supported(self):
382
    #
383
    #     source_prefix = join(self.prefix, 'envs', '_green_')
384
    #     mkdir_p(join(source_prefix, 'conda-meta'))
385
    #     touch(join(source_prefix, 'conda-meta', 'history'))
386
    #
387
    #     test_file_1 = make_test_file(join(source_prefix, 'Scripts'), suffix='',
388
    #                                  contents='echo red')
389
    #     test_file_1 = test_file_1[len(source_prefix) + 1:]
390
    #     assert check_output(shlex_split("sh -c '. \"%s\"'" % join(source_prefix, test_file_1))).strip() == b"red"
391
    #
392
    #     test_file_2 = make_test_file(join(source_prefix, 'Scripts'), suffix='.bat',
393
    #                                  contents='@echo off\necho blue')
394
    #     test_file_2 = test_file_2[len(source_prefix) + 1:]
395
    #     assert check_output(shlex_split("cmd /C \"%s\"" % join(source_prefix, test_file_2))).strip() == b"blue"
396
    #
397
    #     package_info = AttrDict(
398
    #         index_json_record=AttrDict(name="green_package"),
399
    #         repodata_record=AttrDict(preferred_env="green"),
400
    #         package_metadata=AttrDict(
401
    #             preferred_env=AttrDict(
402
    #                 softlink_paths=[
403
    #                     test_file_1,
404
    #                     test_file_2,
405
    #                 ]
406
    #             )
407
    #         ),
408
    #     )
409
    #     target_full_path_1 = join(self.prefix, test_file_1)
410
    #     target_full_path_2 = join(self.prefix, test_file_2)
411
    #     mkdir_p(join(dirname(target_full_path_1)))
412
    #
413
    #     with env_var("CONDA_ROOT_PREFIX", self.prefix, reset_context):
414
    #         softlink_supported_test_file = join(source_prefix, PREFIX_MAGIC_FILE)
415
    #         from conda.gateways.disk.test import softlink_supported
416
    #         softlink_actually_supported = softlink_supported(softlink_supported_test_file,
417
    #                                                          context.root_prefix)
418
    #
419
    #         import conda.core.path_actions
420
    #         with patch.object(conda.core.path_actions, "softlink_supported") as softlink_supported_mock:
421
    #             softlink_supported_mock.return_value = False
422
    #
423
    #             from conda.core.path_actions import CreateApplicationSoftlinkAction
424
    #             axns = CreateApplicationSoftlinkAction.create_actions({}, package_info,
425
    #                                                                   source_prefix, None)
426
    #             assert len(axns) == 2
427
    #
428
    #             axn = axns[0]
429
    #             assert axn.target_full_path == target_full_path_1
430
    #             assert axn.softlink_method == "softlink_or_fail_ok"
431
    #             axn.verify()
432
    #             axn.execute()
433
    #             if softlink_actually_supported:
434
    #                 assert islink(axn.target_full_path)
435
    #                 assert check_output(shlex_split("sh -c '. \"%s\"'" % axn.target_full_path)).strip() == b"red"
436
    #             else:
437
    #                 assert not lexists(axn.target_full_path)
438
    #             axn.reverse()
439
    #             assert not lexists(axn.target_full_path)
440
    #
441
    #             axn = axns[1]
442
    #             assert axn.target_full_path == target_full_path_2
443
    #             assert axn.softlink_method == "fake_exe_softlink"
444
    #             axn.verify()
445
    #             axn.execute()
446
    #             assert isfile(axn.target_full_path)
447
    #             assert check_output(shlex_split("cmd /C \"%s\"" % axn.target_full_path)).strip() == b"blue"
448
    #             axn.reverse()
449
    #             assert not lexists(axn.target_full_path)
450
451
    # @pytest.mark.skipif(not on_win, reason="windows-only test")
452
    # def test_CreateApplicationSoftlinkAction_basic_symlink_windows_supported(self):
453
    #     source_prefix = join(self.prefix, 'envs', '_green_')
454
    #     mkdir_p(join(source_prefix, 'conda-meta'))
455
    #     touch(join(source_prefix, 'conda-meta', 'history'))
456
    #
457
    #     test_file_1 = make_test_file(join(source_prefix, 'Scripts'), suffix='',
458
    #                                  contents='echo red')
459
    #     test_file_1 = test_file_1[len(source_prefix) + 1:]
460
    #     assert check_output(shlex_split(
461
    #         "sh -c '. \"%s\"'" % join(source_prefix, test_file_1))).strip() == b"red"
462
    #
463
    #     test_file_2 = make_test_file(join(source_prefix, 'Scripts'), suffix='.bat',
464
    #                                  contents='@echo off\necho blue')
465
    #     test_file_2 = test_file_2[len(source_prefix) + 1:]
466
    #     assert check_output(shlex_split("cmd /C \"%s\"" % join(source_prefix, test_file_2))).strip() == b"blue"
467
    #
468
    #     package_info = AttrDict(
469
    #         index_json_record=AttrDict(name="green_package"),
470
    #         repodata_record=AttrDict(preferred_env="green"),
471
    #         package_metadata=AttrDict(
472
    #             preferred_env=AttrDict(
473
    #                 softlink_paths=[
474
    #                     test_file_1,
475
    #                     test_file_2,
476
    #                 ]
477
    #             )
478
    #         ),
479
    #     )
480
    #     target_full_path_1 = join(self.prefix, test_file_1)
481
    #     target_full_path_2 = join(self.prefix, test_file_2)
482
    #     mkdir_p(join(dirname(target_full_path_1)))
483
    #
484
    #     with env_var("CONDA_ROOT_PREFIX", self.prefix, reset_context):
485
    #         softlink_supported_test_file = join(source_prefix, PREFIX_MAGIC_FILE)
486
    #         from conda.gateways.disk.test import softlink_supported
487
    #         softlink_actually_supported = softlink_supported(softlink_supported_test_file,
488
    #                                                          context.root_prefix)
489
    #
490
    #         import conda.core.path_actions
491
    #         with patch.object(conda.core.path_actions, "softlink_supported") as softlink_supported_mock:
492
    #             softlink_supported_mock.return_value = True
493
    #
494
    #             CreateApplicationSoftlinkAction = conda.core.path_actions.CreateApplicationSoftlinkAction
495
    #             axns = CreateApplicationSoftlinkAction.create_actions({}, package_info,
496
    #                                                                   source_prefix, None)
497
    #             assert len(axns) == 2
498
    #
499
    #             axn = axns[0]
500
    #             assert axn.target_full_path == target_full_path_1
501
    #             assert axn.softlink_method == "softlink"
502
    #             axn.verify()
503
    #             if softlink_actually_supported:
504
    #                 axn.execute()
505
    #                 assert islink(axn.target_full_path)
506
    #                 assert check_output(shlex_split("sh -c '. \"%s\"'" % axn.target_full_path)).strip() == b"red"
507
    #             else:
508
    #                 with pytest.raises(AssertionError):
509
    #                     axn.execute()
510
    #             axn.reverse()
511
    #             assert not lexists(axn.target_full_path)
512
    #
513
    #             axn = axns[1]
514
    #             assert axn.target_full_path == target_full_path_2
515
    #             assert axn.softlink_method == "softlink"
516
    #             axn.verify()
517
    #             if softlink_actually_supported:
518
    #                 axn.execute()
519
    #                 assert isfile(axn.target_full_path)
520
    #                 assert check_output(shlex_split("cmd /C \"%s\"" % axn.target_full_path)).strip() == b"blue"
521
    #             else:
522
    #                 with pytest.raises(AssertionError):
523
    #                     axn.execute()
524
    #             axn.reverse()
525
    #             assert not lexists(axn.target_full_path)
526