Completed
Push — master ( 616cf9...082fe8 )
by Kale
62:33
created

_Activator._get_path_dirs2()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 10
rs 9.4285
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2012 Anaconda, Inc
3
# SPDX-License-Identifier: BSD-3-Clause
4
from __future__ import absolute_import, division, print_function, unicode_literals
5
6
from errno import ENOENT
7
from glob import glob
8
import os
9
from os.path import abspath, basename, dirname, expanduser, expandvars, isdir, join, normcase
10
import re
11
import sys
12
from tempfile import NamedTemporaryFile
13
14
from . import CONDA_PACKAGE_ROOT, CondaError
15
from .base.context import ROOT_ENV_NAME, context, locate_prefix_by_name
16
17
try:
18
    from cytoolz.itertoolz import concatv, drop
19
except ImportError:  # pragma: no cover
20
    from ._vendor.toolz.itertoolz import concatv, drop  # NOQA
21
22
23
class _Activator(object):
24
    # Activate and deactivate have three tasks
25
    #   1. Set and unset environment variables
26
    #   2. Execute/source activate.d/deactivate.d scripts
27
    #   3. Update the command prompt
28
    #
29
    # Shells should also use 'reactivate' following conda's install, update, and
30
    #   remove/uninstall commands.
31
    #
32
    # All core logic is in build_activate() or build_deactivate(), and is independent of
33
    # shell type.  Each returns a map containing the keys:
34
    #   export_vars
35
    #   unset_var
36
    #   activate_scripts
37
    #   deactivate_scripts
38
    #
39
    # The value of the CONDA_PROMPT_MODIFIER environment variable holds conda's contribution
40
    #   to the command prompt.
41
    #
42
    # To implement support for a new shell, ideally one would only need to add shell-specific
43
    # information to the __init__ method of this class.
44
45
    # The following instance variables must be defined by each implementation.
46
    pathsep_join = None
47
    sep = None
48
    path_conversion = None
49
    script_extension = None
50
    tempfile_extension = None  # None means write instructions to stdout rather than a temp file
51
    command_join = None
52
53
    unset_var_tmpl = None
54
    export_var_tmpl = None
55
    set_var_tmpl = None
56
    run_script_tmpl = None
57
58
    hook_source_path = None
59
60
    def __init__(self, arguments=None):
61
        self._raw_arguments = arguments
62
63
        if PY2:
64
            self.environ = {ensure_fs_path_encoding(k): ensure_fs_path_encoding(v)
65
                            for k, v in iteritems(os.environ)}
66
        else:
67
            self.environ = os.environ.copy()
68
69
    def _finalize(self, commands, ext):
70
        commands = concatv(commands, ('',))  # add terminating newline
71
        if ext is None:
72
            return self.command_join.join(commands)
73
        elif ext:
74
            with NamedTemporaryFile('w+b', suffix=ext, delete=False) as tf:
75
                # the default mode is 'w+b', and universal new lines don't work in that mode
76
                # command_join should account for that
77
                tf.write(ensure_binary(self.command_join.join(commands)))
78
            return tf.name
79
        else:
80
            raise NotImplementedError()
81
82
    def activate(self):
83
        if self.stack:
84
            builder_result = self.build_stack(self.env_name_or_prefix)
85
        else:
86
            builder_result = self.build_activate(self.env_name_or_prefix)
87
        return self._finalize(self._yield_commands(builder_result), self.tempfile_extension)
88
89
    def deactivate(self):
90
        return self._finalize(self._yield_commands(self.build_deactivate()),
91
                              self.tempfile_extension)
92
93
    def reactivate(self):
94
        return self._finalize(self._yield_commands(self.build_reactivate()),
95
                              self.tempfile_extension)
96
97
    def hook(self, auto_activate_base=None):
98
        builder = []
99
        builder.append(self._hook_preamble())
100
        with open(self.hook_source_path) as fsrc:
101
            builder.append(fsrc.read())
102
        if auto_activate_base is None and context.auto_activate_base or auto_activate_base:
103
            builder.append("conda activate base\n")
104
        return "\n".join(builder)
105
106
    def execute(self):
107
        # return value meant to be written to stdout
108
        self._parse_and_set_args(self._raw_arguments)
109
        return getattr(self, self.command)()
110
111
    def _hook_preamble(self):
112
        # must be implemented in subclass
113
        raise NotImplementedError()
114
115
    def _parse_and_set_args(self, arguments):
116
        # the first index of arguments MUST be either activate, deactivate, or reactivate
117
        if arguments is None:
118
            from .exceptions import ArgumentError
119
            raise ArgumentError("'activate', 'deactivate', or 'reactivate' command must be given")
120
121
        command = arguments[0]
122
        arguments = tuple(drop(1, arguments))
123
        help_flags = ('-h', '--help', '/?')
124
        non_help_args = tuple(arg for arg in arguments if arg not in help_flags)
125
        help_requested = len(arguments) != len(non_help_args)
126
        remainder_args = list(arg for arg in non_help_args if arg and arg != command)
127
128
        if not command:
129
            from .exceptions import ArgumentError
130
            raise ArgumentError("'activate', 'deactivate', 'hook', or 'reactivate' "
131
                                "command must be given")
132
        elif help_requested:
133
            from .exceptions import ActivateHelp, DeactivateHelp, GenericHelp
134
            help_classes = {
135
                'activate': ActivateHelp(),
136
                'deactivate': DeactivateHelp(),
137
                'hook': GenericHelp('hook'),
138
                'reactivate': GenericHelp('reactivate'),
139
            }
140
            raise help_classes[command]
141
        elif command not in ('activate', 'deactivate', 'reactivate', 'hook'):
142
            from .exceptions import ArgumentError
143
            raise ArgumentError("invalid command '%s'" % command)
144
145
        if command == 'activate':
146
            try:
147
                stack_idx = remainder_args.index('--stack')
148
            except ValueError:
149
                self.stack = False
150
            else:
151
                del remainder_args[stack_idx]
152
                self.stack = True
153
            if len(remainder_args) > 1:
154
                from .exceptions import ArgumentError
155
                raise ArgumentError(command + ' does not accept more than one argument:\n'
156
                                    + str(remainder_args) + '\n')
157
            self.env_name_or_prefix = remainder_args and remainder_args[0] or 'base'
158
159
        else:
160
            if remainder_args:
161
                from .exceptions import ArgumentError
162
                raise ArgumentError('%s does not accept arguments\nremainder_args: %s\n'
163
                                    % (command, remainder_args))
164
165
        self.command = command
166
167
    def _yield_commands(self, cmds_dict):
168
        for script in cmds_dict.get('deactivate_scripts', ()):
169
            yield self.run_script_tmpl % script
170
171
        for key in sorted(cmds_dict.get('unset_vars', ())):
172
            yield self.unset_var_tmpl % key
173
174
        for key, value in sorted(iteritems(cmds_dict.get('set_vars', {}))):
175
            yield self.set_var_tmpl % (key, value)
176
177
        for key, value in sorted(iteritems(cmds_dict.get('export_vars', {}))):
178
            yield self.export_var_tmpl % (key, value)
179
180
        for script in cmds_dict.get('activate_scripts', ()):
181
            yield self.run_script_tmpl % script
182
183
    def build_activate(self, env_name_or_prefix):
184
        return self._build_activate_stack(env_name_or_prefix, False)
185
186
    def build_stack(self, env_name_or_prefix):
187
        return self._build_activate_stack(env_name_or_prefix, True)
188
189
    def _build_activate_stack(self, env_name_or_prefix, stack):
190
        if re.search(r'\\|/', env_name_or_prefix):
191
            prefix = expand(env_name_or_prefix)
192
            if not isdir(join(prefix, 'conda-meta')):
193
                from .exceptions import EnvironmentLocationNotFound
194
                raise EnvironmentLocationNotFound(prefix)
195
        elif env_name_or_prefix in (ROOT_ENV_NAME, 'root'):
196
            prefix = context.root_prefix
197
        else:
198
            prefix = locate_prefix_by_name(env_name_or_prefix)
199
200
        # query environment
201
        old_conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
202
        new_conda_shlvl = old_conda_shlvl + 1
203
        old_conda_prefix = self.environ.get('CONDA_PREFIX')
204
205
        if old_conda_prefix == prefix and old_conda_shlvl > 0:
206
            return self.build_reactivate()
207
208
        activate_scripts = self._get_activate_scripts(prefix)
209
        conda_default_env = self._default_env(prefix)
210
        conda_prompt_modifier = self._prompt_modifier(prefix, conda_default_env)
211
212
        if old_conda_shlvl == 0:
213
            new_path = self.pathsep_join(self._add_prefix_to_path(prefix))
214
            conda_python_exe = self.path_conversion(sys.executable)
215
            conda_exe = self.path_conversion(context.conda_exe)
216
            export_vars = {
217
                'CONDA_PYTHON_EXE': conda_python_exe,
218
                'CONDA_EXE': conda_exe,
219
                'PATH': new_path,
220
                'CONDA_PREFIX': prefix,
221
                'CONDA_SHLVL': new_conda_shlvl,
222
                'CONDA_DEFAULT_ENV': conda_default_env,
223
                'CONDA_PROMPT_MODIFIER': conda_prompt_modifier,
224
            }
225
            deactivate_scripts = ()
226
        else:
227
            if self.environ.get('CONDA_PREFIX_%s' % (old_conda_shlvl - 1)) == prefix:
228
                # in this case, user is attempting to activate the previous environment,
229
                #  i.e. step back down
230
                return self.build_deactivate()
231
            if stack:
232
                new_path = self.pathsep_join(self._add_prefix_to_path(prefix))
233
                export_vars = {
234
                    'PATH': new_path,
235
                    'CONDA_PREFIX': prefix,
236
                    'CONDA_PREFIX_%d' % old_conda_shlvl: old_conda_prefix,
237
                    'CONDA_SHLVL': new_conda_shlvl,
238
                    'CONDA_DEFAULT_ENV': conda_default_env,
239
                    'CONDA_PROMPT_MODIFIER': conda_prompt_modifier,
240
                    'CONDA_STACKED_%d' % new_conda_shlvl: 'true',
241
                }
242
                deactivate_scripts = ()
243
            else:
244
                new_path = self.pathsep_join(
245
                    self._replace_prefix_in_path(old_conda_prefix, prefix)
246
                )
247
                export_vars = {
248
                    'PATH': new_path,
249
                    'CONDA_PREFIX': prefix,
250
                    'CONDA_PREFIX_%d' % old_conda_shlvl: old_conda_prefix,
251
                    'CONDA_SHLVL': new_conda_shlvl,
252
                    'CONDA_DEFAULT_ENV': conda_default_env,
253
                    'CONDA_PROMPT_MODIFIER': conda_prompt_modifier,
254
                }
255
                deactivate_scripts = self._get_deactivate_scripts(old_conda_prefix)
256
257
        set_vars = {}
258
        if context.changeps1:
259
            self._update_prompt(set_vars, conda_prompt_modifier)
260
261
        self._build_activate_shell_custom(export_vars)
262
263
        return {
264
            'unset_vars': (),
265
            'set_vars': set_vars,
266
            'export_vars': export_vars,
267
            'deactivate_scripts': deactivate_scripts,
268
            'activate_scripts': activate_scripts,
269
        }
270
271
    def build_deactivate(self):
272
        # query environment
273
        old_conda_prefix = self.environ.get('CONDA_PREFIX')
274
        old_conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
275
        if not old_conda_prefix or old_conda_shlvl < 1:
276
            # no active environment, so cannot deactivate; do nothing
277
            return {
278
                'unset_vars': (),
279
                'set_vars': {},
280
                'export_vars': {},
281
                'deactivate_scripts': (),
282
                'activate_scripts': (),
283
            }
284
        deactivate_scripts = self._get_deactivate_scripts(old_conda_prefix)
285
286
        new_conda_shlvl = old_conda_shlvl - 1
287
        set_vars = {}
288
        if old_conda_shlvl == 1:
289
            new_path = self.pathsep_join(self._remove_prefix_from_path(old_conda_prefix))
290
            conda_prompt_modifier = ''
291
            unset_vars = (
292
                'CONDA_PREFIX',
293
                'CONDA_DEFAULT_ENV',
294
                'CONDA_PYTHON_EXE',
295
                'CONDA_PROMPT_MODIFIER',
296
            )
297
            export_vars = {
298
                'PATH': new_path,
299
                'CONDA_SHLVL': new_conda_shlvl,
300
            }
301
            activate_scripts = ()
302
        else:
303
            assert old_conda_shlvl > 1
304
            new_prefix = self.environ.get('CONDA_PREFIX_%d' % new_conda_shlvl)
305
            conda_default_env = self._default_env(new_prefix)
306
            conda_prompt_modifier = self._prompt_modifier(new_prefix, conda_default_env)
307
308
            old_prefix_stacked = 'CONDA_STACKED_%d' % old_conda_shlvl in self.environ
309
            if old_prefix_stacked:
310
                new_path = self.pathsep_join(self._remove_prefix_from_path(old_conda_prefix))
311
                unset_vars = (
312
                    'CONDA_PREFIX_%d' % new_conda_shlvl,
313
                    'CONDA_STACKED_%d' % old_conda_shlvl,
314
                )
315
            else:
316
                new_path = self.pathsep_join(
317
                    self._replace_prefix_in_path(old_conda_prefix, new_prefix)
318
                )
319
                unset_vars = (
320
                    'CONDA_PREFIX_%d' % new_conda_shlvl,
321
                )
322
323
            export_vars = {
324
                'PATH': new_path,
325
                'CONDA_SHLVL': new_conda_shlvl,
326
                'CONDA_PREFIX': new_prefix,
327
                'CONDA_DEFAULT_ENV': conda_default_env,
328
                'CONDA_PROMPT_MODIFIER': conda_prompt_modifier,
329
            }
330
            activate_scripts = self._get_activate_scripts(new_prefix)
331
332
        if context.changeps1:
333
            self._update_prompt(set_vars, conda_prompt_modifier)
334
335
        return {
336
            'unset_vars': unset_vars,
337
            'set_vars': set_vars,
338
            'export_vars': export_vars,
339
            'deactivate_scripts': deactivate_scripts,
340
            'activate_scripts': activate_scripts,
341
        }
342
343
    def build_reactivate(self):
344
        conda_prefix = self.environ.get('CONDA_PREFIX')
345
        conda_shlvl = int(self.environ.get('CONDA_SHLVL', '').strip() or 0)
346
        if not conda_prefix or conda_shlvl < 1:
347
            # no active environment, so cannot reactivate; do nothing
348
            return {
349
                'unset_vars': (),
350
                'set_vars': {},
351
                'export_vars': {},
352
                'deactivate_scripts': (),
353
                'activate_scripts': (),
354
            }
355
        conda_default_env = self.environ.get('CONDA_DEFAULT_ENV', self._default_env(conda_prefix))
356
        new_path = self.pathsep_join(self._replace_prefix_in_path(conda_prefix, conda_prefix))
357
        set_vars = {}
358
359
        conda_prompt_modifier = self._prompt_modifier(conda_prefix, conda_default_env)
360
        if context.changeps1:
361
            self._update_prompt(set_vars, conda_prompt_modifier)
362
363
        # environment variables are set only to aid transition from conda 4.3 to conda 4.4
364
        return {
365
            'unset_vars': (),
366
            'set_vars': set_vars,
367
            'export_vars': {
368
                'PATH': new_path,
369
                'CONDA_SHLVL': conda_shlvl,
370
                'CONDA_PROMPT_MODIFIER': self._prompt_modifier(conda_prefix, conda_default_env),
371
            },
372
            'deactivate_scripts': self._get_deactivate_scripts(conda_prefix),
373
            'activate_scripts': self._get_activate_scripts(conda_prefix),
374
        }
375
376
    def _get_starting_path_list(self):
377
        path = self.environ['PATH']
378
        if on_win:
379
            # On Windows, the Anaconda Python interpreter prepends sys.prefix\Library\bin on
380
            # startup. It's a hack that allows users to avoid using the correct activation
381
            # procedure; a hack that needs to go away because it doesn't add all the paths.
382
            # See: https://github.com/AnacondaRecipes/python-feedstock/blob/master/recipe/0005-Win32-Ensure-Library-bin-is-in-os.environ-PATH.patch  # NOQA
383
            # But, we now detect if that has happened because:
384
            #   1. In future we would like to remove this hack and require real activation.
385
            #   2. We should not assume that the Anaconda Python interpreter is being used.
386
            path_split = path.split(os.pathsep)
387
            library_bin = r"%s\Library\bin" % (sys.prefix)
388
            # ^^^ deliberately the same as: https://github.com/AnacondaRecipes/python-feedstock/blob/8e8aee4e2f4141ecfab082776a00b374c62bb6d6/recipe/0005-Win32-Ensure-Library-bin-is-in-os.environ-PATH.patch#L20  # NOQA
389
            if paths_equal(path_split[0], library_bin):
390
                return path_split[1:]
391
            else:
392
                return path_split
393
        else:
394
            return path.split(os.pathsep)
395
396
    @staticmethod
397
    def _get_path_dirs(prefix):
398
        if on_win:  # pragma: unix no cover
399
            yield prefix.rstrip("\\")
400
            yield join(prefix, 'Library', 'mingw-w64', 'bin')
401
            yield join(prefix, 'Library', 'usr', 'bin')
402
            yield join(prefix, 'Library', 'bin')
403
            yield join(prefix, 'Scripts')
404
            yield join(prefix, 'bin')
405
        else:
406
            yield join(prefix, 'bin')
407
408
    def _get_path_dirs2(self, prefix):
409
        if on_win:  # pragma: unix no cover
410
            yield prefix
411
            yield self.sep.join((prefix, 'Library', 'mingw-w64', 'bin'))
412
            yield self.sep.join((prefix, 'Library', 'usr', 'bin'))
413
            yield self.sep.join((prefix, 'Library', 'bin'))
414
            yield self.sep.join((prefix, 'Scripts'))
415
            yield self.sep.join((prefix, 'bin'))
416
        else:
417
            yield self.sep.join((prefix, 'bin'))
418
419
    def _add_prefix_to_path(self, prefix, starting_path_dirs=None):
420
        prefix = self.path_conversion(prefix)
421
        if starting_path_dirs is None:
422
            path_list = list(self.path_conversion(self._get_starting_path_list()))
423
        else:
424
            path_list = list(self.path_conversion(starting_path_dirs))
425
        path_list[0:0] = list(self._get_path_dirs2(prefix))
426
        return tuple(path_list)
427
428
    def _remove_prefix_from_path(self, prefix, starting_path_dirs=None):
429
        return self._replace_prefix_in_path(prefix, None, starting_path_dirs)
430
431
    def _replace_prefix_in_path(self, old_prefix, new_prefix, starting_path_dirs=None):
432
        old_prefix = self.path_conversion(old_prefix)
433
        new_prefix = self.path_conversion(new_prefix)
434
        if starting_path_dirs is None:
435
            path_list = list(self.path_conversion(self._get_starting_path_list()))
436
        else:
437
            path_list = list(self.path_conversion(starting_path_dirs))
438
439
        def index_of_path(paths, test_path):
440
            for q, path in enumerate(paths):
441
                if paths_equal(path, test_path):
442
                    return q
443
            return None
444
445
        if old_prefix is not None:
446
            prefix_dirs = tuple(self._get_path_dirs2(old_prefix))
447
            first_idx = index_of_path(path_list, prefix_dirs[0])
448
            if first_idx is None:
449
                first_idx = 0
450
            else:
451
                last_idx = index_of_path(path_list, prefix_dirs[-1])
452
                assert last_idx is not None
453
                del path_list[first_idx:last_idx + 1]
454
        else:
455
            first_idx = 0
456
457
        if new_prefix is not None:
458
            path_list[first_idx:first_idx] = list(self._get_path_dirs2(new_prefix))
459
460
        return tuple(path_list)
461
462
    def _build_activate_shell_custom(self, export_vars):
463
        # A method that can be overriden by shell-specific implementations.
464
        # The signature of this method may change in the future.
465
        pass
466
467
    def _update_prompt(self, set_vars, conda_prompt_modifier):
468
        pass
469
470
    def _default_env(self, prefix):
471
        if paths_equal(prefix, context.root_prefix):
472
            return 'base'
473
        return basename(prefix) if basename(dirname(prefix)) == 'envs' else prefix
474
475
    def _prompt_modifier(self, prefix, conda_default_env):
476
        if context.changeps1:
477
            return context.env_prompt.format(
478
                default_env=conda_default_env,
479
                prefix=prefix,
480
                name=basename(prefix),
481
            )
482
        else:
483
            return ""
484
485
    def _get_activate_scripts(self, prefix):
486
        return self.path_conversion(sorted(glob(join(
487
            prefix, 'etc', 'conda', 'activate.d', '*' + self.script_extension
488
        ))))
489
490
    def _get_deactivate_scripts(self, prefix):
491
        return self.path_conversion(sorted(glob(join(
492
            prefix, 'etc', 'conda', 'deactivate.d', '*' + self.script_extension
493
        )), reverse=True))
494
495
496
def expand(path):
497
    return abspath(expanduser(expandvars(path)))
498
499
500
def ensure_binary(value):
501
    try:
502
        return value.encode('utf-8')
503
    except AttributeError:  # pragma: no cover
504
        # AttributeError: '<>' object has no attribute 'encode'
505
        # In this case assume already binary type and do nothing
506
        return value
507
508
509
def ensure_fs_path_encoding(value):
510
    try:
511
        return value.decode(FILESYSTEM_ENCODING)
512
    except AttributeError:
513
        return value
514
515
516
def native_path_to_unix(paths):  # pragma: unix no cover
517
    # on windows, uses cygpath to convert windows native paths to posix paths
518
    if not on_win:
519
        return path_identity(paths)
520
    if paths is None:
521
        return None
522
    from subprocess import CalledProcessError, PIPE, Popen
523
    from shlex import split
524
    command = 'cygpath --path -f -'
525
526
    single_path = isinstance(paths, string_types)
527
    joined = paths if single_path else ("%s" % os.pathsep).join(paths)
528
529
    if hasattr(joined, 'encode'):
530
        joined = joined.encode('utf-8')
531
532
    try:
533
        p = Popen(split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
534
    except EnvironmentError as e:
535
        if e.errno != ENOENT:
536
            raise
537
        # This code path should (hopefully) never be hit be real conda installs. It's here
538
        # as a backup for tests run under cmd.exe with cygpath not available.
539
        def _translation(found_path):  # NOQA
540
            found = found_path.group(1).replace("\\", "/").replace(":", "").replace("//", "/")
541
            return "/" + found.rstrip("/")
542
        joined = ensure_fs_path_encoding(joined)
543
        stdout = re.sub(
544
            r'([a-zA-Z]:[\/\\\\]+(?:[^:*?\"<>|;]+[\/\\\\]*)*)',
545
            _translation,
546
            joined
547
        ).replace(";/", ":/").rstrip(";")
548
    else:
549
        stdout, stderr = p.communicate(input=joined)
550
        rc = p.returncode
551
        if rc != 0 or stderr:
552
            message = "\n  stdout: %s\n  stderr: %s\n  rc: %s\n" % (stdout, stderr, rc)
553
            print(message, file=sys.stderr)
554
            raise CalledProcessError(rc, command, message)
555
        if hasattr(stdout, 'decode'):
556
            stdout = stdout.decode('utf-8')
557
        stdout = stdout.strip()
558
    final = stdout and stdout.split(':') or ()
559
    return final[0] if single_path else tuple(final)
560
561
562
def path_identity(paths):
563
    if isinstance(paths, string_types):
564
        return paths
565
    elif paths is None:
566
        return None
567
    else:
568
        return tuple(paths)
569
570
571
def paths_equal(path1, path2):
572
    if on_win:
573
        return normcase(abspath(path1)) == normcase(abspath(path2))
574
    else:
575
        return abspath(path1) == abspath(path2)
576
577
578
on_win = bool(sys.platform == "win32")
579
PY2 = sys.version_info[0] == 2
580
FILESYSTEM_ENCODING = sys.getfilesystemencoding()
581
if PY2:  # pragma: py3 no cover
582
    string_types = basestring,  # NOQA
583
    text_type = unicode  # NOQA
584
585
    def iteritems(d, **kw):
586
        return d.iteritems(**kw)
587
else:  # pragma: py2 no cover
588
    string_types = str,
589
    text_type = str
590
591
    def iteritems(d, **kw):
592
        return iter(d.items(**kw))
593
594
595 View Code Duplication
class PosixActivator(_Activator):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
596
597
    def __init__(self, arguments=None):
598
        self.pathsep_join = ':'.join
599
        self.sep = '/'
600
        self.path_conversion = native_path_to_unix
601
        self.script_extension = '.sh'
602
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
603
        self.command_join = '\n'
604
605
        self.unset_var_tmpl = '\\unset %s'
606
        self.export_var_tmpl = "\\export %s='%s'"
607
        self.set_var_tmpl = "%s='%s'"
608
        self.run_script_tmpl = '\\. "%s"'
609
610
        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'etc', 'profile.d', 'conda.sh')
611
612
        super(PosixActivator, self).__init__(arguments)
613
614
    def _update_prompt(self, set_vars, conda_prompt_modifier):
615
        ps1 = self.environ.get('PS1', '')
616
        current_prompt_modifier = self.environ.get('CONDA_PROMPT_MODIFIER')
617
        if current_prompt_modifier:
618
            ps1 = re.sub(re.escape(current_prompt_modifier), r'', ps1)
619
        # Because we're using single-quotes to set shell variables, we need to handle the
620
        # proper escaping of single quotes that are already part of the string.
621
        # Best solution appears to be https://stackoverflow.com/a/1250279
622
        ps1 = ps1.replace("'", "'\"'\"'")
623
        set_vars.update({
624
            'PS1': conda_prompt_modifier + ps1,
625
        })
626
627
    def _hook_preamble(self):
628
        if on_win:
629
            return ('export CONDA_EXE="$(cygpath \'%s\')"\n'
630
                    'export CONDA_BAT="%s"'
631
                    % (context.conda_exe, join(context.conda_prefix, 'condacmd', 'conda.bat'))
632
                    )
633
        else:
634
            return 'export CONDA_EXE="%s"' % context.conda_exe
635
636
637 View Code Duplication
class CshActivator(_Activator):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
638
639
    def __init__(self, arguments=None):
640
        self.pathsep_join = ':'.join
641
        self.sep = '/'
642
        self.path_conversion = native_path_to_unix
643
        self.script_extension = '.csh'
644
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
645
        self.command_join = ';\n'
646
647
        self.unset_var_tmpl = 'unsetenv %s'
648
        self.export_var_tmpl = 'setenv %s "%s"'
649
        self.set_var_tmpl = "set %s='%s'"
650
        self.run_script_tmpl = 'source "%s"'
651
652
        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'etc', 'profile.d', 'conda.csh')
653
654
        super(CshActivator, self).__init__(arguments)
655
656
    def _update_prompt(self, set_vars, conda_prompt_modifier):
657
        prompt = self.environ.get('prompt', '')
658
        current_prompt_modifier = self.environ.get('CONDA_PROMPT_MODIFIER')
659
        if current_prompt_modifier:
660
            prompt = re.sub(re.escape(current_prompt_modifier), r'', prompt)
661
        set_vars.update({
662
            'prompt': conda_prompt_modifier + prompt,
663
        })
664
665
    def _hook_preamble(self):
666
        if on_win:
667
            return ('setenv CONDA_EXE `cygpath %s`\n'
668
                    'setenv _CONDA_ROOT `cygpath %s`\n'
669
                    'setenv _CONDA_EXE `cygpath %s`'
670
                    % (context.conda_exe, context.conda_prefix, context.conda_exe))
671
        else:
672
            return ('setenv CONDA_EXE "%s"\n'
673
                    'setenv _CONDA_ROOT "%s"\n'
674
                    'setenv _CONDA_EXE "%s"'
675
                    % (context.conda_exe, context.conda_prefix, context.conda_exe))
676
677
678 View Code Duplication
class XonshActivator(_Activator):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
679
680
    def __init__(self, arguments=None):
681
        self.pathsep_join = ':'.join
682
        self.sep = '/'
683
        self.path_conversion = native_path_to_unix
684
        self.script_extension = '.xsh'
685
        self.tempfile_extension = '.xsh'
686
        self.command_join = '\n'
687
688
        self.unset_var_tmpl = 'del $%s'
689
        self.export_var_tmpl = "$%s = '%s'"
690
        self.set_var_tmpl = "$%s = '%s'"  # TODO: determine if different than export_var_tmpl
691
        self.run_script_tmpl = 'source "%s"'
692
693
        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'conda.xsh')
694
695
        super(XonshActivator, self).__init__(arguments)
696
697
    def _hook_preamble(self):
698
        return 'CONDA_EXE = "%s"' % context.conda_exe
699
700
701
class CmdExeActivator(_Activator):
702
703
    def __init__(self, arguments=None):
704
        self.pathsep_join = ';'.join
705
        self.sep = '\\'
706
        self.path_conversion = path_identity
707
        self.script_extension = '.bat'
708
        self.tempfile_extension = '.bat'
709
        self.command_join = '\r\n' if on_win else '\n'
710
711
        self.unset_var_tmpl = '@SET %s='
712
        self.export_var_tmpl = '@SET "%s=%s"'
713
        self.set_var_tmpl = '@SET "%s=%s"'  # TODO: determine if different than export_var_tmpl
714
        self.run_script_tmpl = '@CALL "%s"'
715
716
        self.hook_source_path = None
717
        # TODO: cmd.exe doesn't get a hook function? Or do we need to do something different?
718
        #       Like, for cmd.exe only, put a special directory containing only conda.bat on PATH?
719
720
        super(CmdExeActivator, self).__init__(arguments)
721
722
    def _build_activate_shell_custom(self, export_vars):
723
        if on_win:
724
            import ctypes
725
            export_vars.update({
726
                "PYTHONIOENCODING": ctypes.cdll.kernel32.GetACP(),
727
            })
728
729
    def _hook_preamble(self):
730
        raise NotImplementedError()
731
732
733
class FishActivator(_Activator):
734
735
    def __init__(self, arguments=None):
736
        self.pathsep_join = '" "'.join
737
        self.sep = '/'
738
        self.path_conversion = native_path_to_unix
739
        self.script_extension = '.fish'
740
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
741
        self.command_join = ';\n'
742
743
        self.unset_var_tmpl = 'set -e %s'
744
        self.export_var_tmpl = 'set -gx %s "%s"'
745
        self.set_var_tmpl = 'set -g %s "%s"'
746
        self.run_script_tmpl = 'source "%s"'
747
748
        self.hook_source_path = join(CONDA_PACKAGE_ROOT, 'shell', 'etc', 'fish', 'conf.d',
749
                                     'conda.fish')
750
751
        super(FishActivator, self).__init__(arguments)
752
753
    def _hook_preamble(self):
754
        if on_win:
755
            return ('set -gx CONDA_EXE (cygpath "%s")\n'
756
                    'set _CONDA_ROOT (cygpath "%s")\n'
757
                    'set _CONDA_EXE (cygpath "%s")'
758
                    % (context.conda_exe, context.conda_prefix, context.conda_exe))
759
        else:
760
            return ('set -gx CONDA_EXE "%s"\n'
761
                    'set _CONDA_ROOT "%s"\n'
762
                    'set _CONDA_EXE "%s"'
763
                    % (context.conda_exe, context.conda_prefix, context.conda_exe))
764
765
766 View Code Duplication
class PowershellActivator(_Activator):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
767
768
    def __init__(self, arguments=None):
769
        self.pathsep_join = ';'.join
770
        self.sep = '\\'
771
        self.path_conversion = path_identity
772
        self.script_extension = '.ps1'
773
        self.tempfile_extension = None  # write instructions to stdout rather than a temp file
774
        self.command_join = '\n'
775
776
        self.unset_var_tmpl = 'Remove-Variable %s'
777
        self.export_var_tmpl = '$env:%s = "%s"'
778
        self.set_var_tmpl = '$env:%s = "%s"'  # TODO: determine if different than export_var_tmpl
779
        self.run_script_tmpl = '. "%s"'
780
781
        self.hook_source_path = None  # TODO: doesn't yet exist
782
783
        super(PowershellActivator, self).__init__(arguments)
784
785
    def _hook_preamble(self):
786
        raise NotImplementedError()
787
788
789
activator_map = {
790
    'posix': PosixActivator,
791
    'ash': PosixActivator,
792
    'bash': PosixActivator,
793
    'dash': PosixActivator,
794
    'zsh': PosixActivator,
795
    'csh': CshActivator,
796
    'tcsh': CshActivator,
797
    'xonsh': XonshActivator,
798
    'cmd.exe': CmdExeActivator,
799
    'fish': FishActivator,
800
    'powershell': PowershellActivator,
801
}
802
803
804
def main(argv=None):
805
    from .common.compat import init_std_stream_encoding
806
807
    context.__init__()  # On import, context does not include SEARCH_PATH. This line fixes that.
808
809
    init_std_stream_encoding()
810
    argv = argv or sys.argv
811
    assert len(argv) >= 3
812
    assert argv[1].startswith('shell.')
813
    shell = argv[1].replace('shell.', '', 1)
814
    activator_args = argv[2:]
815
    try:
816
        activator_cls = activator_map[shell]
817
    except KeyError:
818
        raise CondaError("%s is not a supported shell." % shell)
819
    activator = activator_cls(activator_args)
820
    try:
821
        print(activator.execute(), end='')
822
        return 0
823
    except Exception as e:
824
        if isinstance(e, CondaError):
825
            print(text_type(e), file=sys.stderr)
826
            return e.return_code
827
        else:
828
            raise
829
830
831
if __name__ == '__main__':
832
    sys.exit(main())
833