Completed
Push — master ( 616cf9...082fe8 )
by Kale
62:33
created

_get_activated_env_vars_unix()   A

Complexity

Conditions 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 20
rs 9.4285
1
# -*- coding: utf-8 -*-
2
from __future__ import absolute_import, division, print_function, unicode_literals
3
4
import json
5
import os
6
from os.path import isfile, join
7
from subprocess import Popen
8
import sys
9
from tempfile import NamedTemporaryFile
10
11
from .. import CondaError
12
from ..base.context import context
13
from ..common.compat import ensure_binary, iteritems, on_win
14
from ..exceptions import CommandNotFoundError
15
from ..gateways.disk.delete import rm_rf
16
from ..gateways.subprocess import subprocess_call
17
18
19
class ExecutableNotFound(CondaError):
20
21
    def __init__(self, target_prefix, executable_name):
22
        message = ("The executable was not found in the target prefix.\n"
23
                   "  target prefix: %(target_prefix)s\n"
24
                   "  executable name: %(executable_name)s"
25
                   )
26
        super(ExecutableNotFound, self).__init__(message, target_prefix=target_prefix,
27
                                                 executable_name=executable_name)
28
29
30
def _get_activated_env_vars_win(env_location):
31
    try:
32
        conda_bat = os.environ["CONDA_BAT"]
33
    except KeyError:
34
        raise CommandNotFoundError("run")
35
36
    temp_path = None
37
    try:
38
        with NamedTemporaryFile('w+b', suffix='.bat', delete=False) as tf:
39
            temp_path = tf.name
40
            tf.write(ensure_binary(
41
                "@%CONDA_PYTHON_EXE% -c \"import os, json; print(json.dumps(dict(os.environ)))\""
42
            ))
43
        cmd_builder = [
44
            "cmd.exe /K \"",
45
            "@SET PROMPT= ",
46
            "&&",
47
            "@SET CONDA_CHANGEPS1=false"
48
            "&&",
49
            "@CALL {0} activate \"{1}\"".format(conda_bat, env_location),
50
            "&&",
51
            "\"{0}\"".format(tf.name),
52
            "\"",
53
        ]
54
        cmd = " ".join(cmd_builder)
55
        result = subprocess_call(cmd)
56
    finally:
57
        if temp_path:
58
            rm_rf(temp_path)
59
60
    assert not result.stderr, result.stderr
61
    env_var_map = json.loads(result.stdout)
62
    return env_var_map
63
64
65
def _get_activated_env_vars_unix(env_location):
66
    try:
67
        conda_exe = os.environ["CONDA_EXE"]
68
    except KeyError:
69
        raise CommandNotFoundError("run")
70
71
    cmd_builder = [
72
        "sh -c \'"
73
        "eval \"$(\"{0}\" shell.posix hook)\"".format(conda_exe),
74
        "&&",
75
        "conda activate \"{0}\"".format(env_location),
76
        "&&",
77
        "\"$CONDA_PYTHON_EXE\" -c \"import os, json; print(json.dumps(dict(os.environ)))\"",
78
        "\'",
79
    ]
80
    cmd = " ".join(cmd_builder)
81
    result = subprocess_call(cmd)
82
    assert not result.stderr, result.stderr
83
    env_var_map = json.loads(result.stdout)
84
    return env_var_map
85
86
87
def get_activated_env_vars():
88
    env_location = context.target_prefix
89
    if on_win:
90
        env_var_map = _get_activated_env_vars_win(env_location)
91
    else:
92
        env_var_map = _get_activated_env_vars_unix(env_location)
93
    env_var_map = {str(k): str(v) for k, v in iteritems(env_var_map)}
94
    return env_var_map
95
96
97
def find_executable(executable_name):
98
    target_prefix = context.target_prefix
99
    if on_win:
100
        executable_path = _find_executable_win(target_prefix, executable_name)
101
    else:
102
        executable_path = _find_executable_unix(target_prefix, executable_name)
103
    if executable_path is None:
104
        raise ExecutableNotFound(target_prefix, executable_name)
105
    return executable_path
106
107
108
def _find_executable_win(target_prefix, executable_name):
109
    from ..activate import _Activator
110
    pathext = tuple(os.environ["PATHEXT"].split(';'))
111
    if executable_name.endswith(pathext):
112
        for path_dir in _Activator._get_path_dirs(target_prefix):
113
            executable_path = join(path_dir, executable_name)
114
            if isfile(executable_path):
115
                return executable_path
116
    else:
117
        for path_dir in _Activator._get_path_dirs(target_prefix):
118
            for ext in pathext:
119
                executable_path = join(path_dir, executable_name + ext)
120
                if isfile(executable_path):
121
                    return executable_path
122
    return None
123
124
125
def _find_executable_unix(target_prefix, executable_name):
126
    executable_path = join(target_prefix, 'bin', executable_name)
127
    if isfile(executable_path) and os.access(executable_path, os.X_OK):
128
        return executable_path
129
    return None
130
131
132
def _exec_win(executable_path, extra_args=(), env_vars=None):
133
    env_vars = os.environ.copy() if env_vars is None else env_vars
134
    args = [executable_path]
135
    args.extend(extra_args)
136
    p = Popen(args, env=env_vars)
137
    try:
138
        p.communicate()
139
    except KeyboardInterrupt:
140
        p.wait()
141
    finally:
142
        sys.exit(p.returncode)
143
144
145
def _exec_unix(executable_path, extra_args=(), env_vars=None):
146
    env_vars = os.environ.copy() if env_vars is None else env_vars
147
    args = [executable_path]
148
    args.extend(extra_args)
149
    os.execve(executable_path, args, env_vars)
150
151
152
def execute(args, parser):
153
    executable_path = find_executable(args.executable_name)
154
    env_vars = get_activated_env_vars()
155
    if on_win:
156
        _exec_win(executable_path, args.extra_args, env_vars)
157
    else:
158
        _exec_unix(executable_path, args.extra_args, env_vars)
159
160
161
if __name__ == "__main__":
162
    print(get_activated_env_vars())
163