Completed
Push — master ( 24da64...15c423 )
by Gonzalo
65:23
created

validate_keys()   C

Complexity

Conditions 7

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
c 1
b 0
f 0
dl 0
loc 21
rs 6.4705
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2012 Anaconda, Inc
3
# SPDX-License-Identifier: BSD-3-Clause
4
from __future__ import absolute_import, print_function
5
6
from collections import OrderedDict
7
from itertools import chain
8
import os
9
10
from conda.base.context import context
11
from conda.cli import common  # TODO: this should never have to import form conda.cli
12
from conda.common.serialize import yaml_load_standard
13
from conda.core.prefix_data import PrefixData
14
from conda.models.enums import PackageType
15
from conda.models.match_spec import MatchSpec
16
from conda.models.prefix_graph import PrefixGraph
17
from conda_env.yaml import dump
18
from . import compat, exceptions, yaml
19
20
try:
21
    from cytoolz.itertoolz import concatv, groupby
22
except ImportError:  # pragma: no cover
23
    from conda._vendor.toolz.itertoolz import concatv, groupby  # NOQA
24
25
26
VALID_KEYS = ('name', 'dependencies', 'prefix', 'channels')
27
28
29
def validate_keys(data, kwargs):
30
    """Check for unknown keys, remove them and print a warning."""
31
    invalid_keys = []
32
    new_data = data.copy()
33
    for key in data.keys():
34
        if key not in VALID_KEYS:
35
            invalid_keys.append(key)
36
            new_data.pop(key)
37
38
    if invalid_keys:
39
        filename = kwargs.get('filename')
40
        verb = 'are' if len(invalid_keys) != 1 else 'is'
41
        plural = 's' if len(invalid_keys) != 1 else ''
42
        print("\nEnvironmentSectionNotValid: The following section{plural} on "
43
              "'{filename}' {verb} invalid and will be ignored:"
44
              "".format(filename=filename, plural=plural, verb=verb))
45
        for key in invalid_keys:
46
            print(' - {}'.format(key))
47
        print('')
48
49
    return new_data
50
51
52
def load_from_directory(directory):
53
    """Load and return an ``Environment`` from a given ``directory``"""
54
    files = ['environment.yml', 'environment.yaml']
55
    while True:
56
        for f in files:
57
            try:
58
                return from_file(os.path.join(directory, f))
59
            except exceptions.EnvironmentFileNotFound:
60
                pass
61
        old_directory = directory
62
        directory = os.path.dirname(directory)
63
        if directory == old_directory:
64
            break
65
    raise exceptions.EnvironmentFileNotFound(files[0])
66
67
68
# TODO tests!!!
69
def from_environment(name, prefix, no_builds=False, ignore_channels=False):
70
    """
71
        Get environment object from prefix
72
    Args:
73
        name: The name of environment
74
        prefix: The path of prefix
75
        no_builds: Whether has build requirement
76
        ignore_channels: whether ignore_channels
77
78
    Returns:     Environment object
79
    """
80
    # requested_specs_map = History(prefix).get_requested_specs_map()
81
    precs = tuple(PrefixGraph(PrefixData(prefix).iter_records()).graph)
82
    grouped_precs = groupby(lambda x: x.package_type, precs)
83
    conda_precs = sorted(concatv(
84
        grouped_precs.get(None, ()),
85
        grouped_precs.get(PackageType.NOARCH_GENERIC, ()),
86
        grouped_precs.get(PackageType.NOARCH_PYTHON, ()),
87
    ), key=lambda x: x.name)
88
89
    pip_precs = sorted(concatv(
90
        grouped_precs.get(PackageType.SHADOW_PYTHON_DIST_INFO, ()),
91
        grouped_precs.get(PackageType.SHADOW_PYTHON_EGG_INFO_DIR, ()),
92
        grouped_precs.get(PackageType.SHADOW_PYTHON_EGG_INFO_FILE, ()),
93
        # grouped_precs.get(PackageType.SHADOW_PYTHON_EGG_LINK, ()),
94
    ), key=lambda x: x.name)
95
96
    if no_builds:
97
        dependencies = ['='.join((a.name, a.version)) for a in conda_precs]
98
    else:
99
        dependencies = ['='.join((a.name, a.version, a.build)) for a in conda_precs]
100
    if pip_precs:
101
        dependencies.append({'pip': ["%s==%s" % (a.name, a.version) for a in pip_precs]})
102
103
    channels = list(context.channels)
104
    if not ignore_channels:
105
        for prec in conda_precs:
106
            canonical_name = prec.channel.canonical_name
107
            if canonical_name not in channels:
108
                channels.insert(0, canonical_name)
109
    return Environment(name=name, dependencies=dependencies, channels=channels, prefix=prefix)
110
111
112
def from_yaml(yamlstr, **kwargs):
113
    """Load and return a ``Environment`` from a given ``yaml string``"""
114
    data = yaml_load_standard(yamlstr)
115
    data = validate_keys(data, kwargs)
116
117
    if kwargs is not None:
118
        for key, value in kwargs.items():
119
            data[key] = value
120
121
    return Environment(**data)
122
123
124
def from_file(filename):
125
    if not os.path.exists(filename):
126
        raise exceptions.EnvironmentFileNotFound(filename)
127
    with open(filename, 'r') as fp:
128
        yamlstr = fp.read()
129
        return from_yaml(yamlstr, filename=filename)
130
131
132
# TODO test explicitly
133
class Dependencies(OrderedDict):
134
    def __init__(self, raw, *args, **kwargs):
135
        super(Dependencies, self).__init__(*args, **kwargs)
136
        self.raw = raw
137
        self.parse()
138
139
    def parse(self):
140
        if not self.raw:
141
            return
142
143
        self.update({'conda': []})
144
145
        for line in self.raw:
146
            if isinstance(line, dict):
147
                self.update(line)
148
            else:
149
                self['conda'].append(common.arg2spec(line))
150
151
        if 'pip' in self:
152
            if not self['pip']:
153
                del self['pip']
154
            if not any(MatchSpec(s).name == 'pip' for s in self['conda']):
155
                self['conda'].append('pip')
156
157
    # TODO only append when it's not already present
158
    def add(self, package_name):
159
        self.raw.append(package_name)
160
        self.parse()
161
162
163
def unique(seq, key=None):
164
    """ Return only unique elements of a sequence
165
    >>> tuple(unique((1, 2, 3)))
166
    (1, 2, 3)
167
    >>> tuple(unique((1, 2, 1, 3)))
168
    (1, 2, 3)
169
    Uniqueness can be defined by key keyword
170
    >>> tuple(unique(['cat', 'mouse', 'dog', 'hen'], key=len))
171
    ('cat', 'mouse')
172
    """
173
    seen = set()
174
    seen_add = seen.add
175
    if key is None:
176
        for item in seq:
177
            if item not in seen:
178
                seen_add(item)
179
                yield item
180
    else:  # calculate key
181
        for item in seq:
182
            val = key(item)
183
            if val not in seen:
184
                seen_add(val)
185
                yield item
186
187
188
class Environment(object):
189
    def __init__(self, name=None, filename=None, channels=None,
190
                 dependencies=None, prefix=None):
191
        self.name = name
192
        self.filename = filename
193
        self.prefix = prefix
194
        self.dependencies = Dependencies(dependencies)
195
196
        if channels is None:
197
            channels = []
198
        self.channels = channels
199
200
    def add_channels(self, channels):
201
        self.channels = list(unique(chain.from_iterable((channels, self.channels))))
202
203
    def remove_channels(self):
204
        self.channels = []
205
206
    def to_dict(self):
207
        d = yaml.dict([('name', self.name)])
208
        if self.channels:
209
            d['channels'] = self.channels
210
        if self.dependencies:
211
            d['dependencies'] = self.dependencies.raw
212
        if self.prefix:
213
            d['prefix'] = self.prefix
214
        return d
215
216
    def to_yaml(self, stream=None):
217
        d = self.to_dict()
218
        out = compat.u(dump(d))
219
        if stream is None:
220
            return out
221
        stream.write(compat.b(out, encoding="utf-8"))
222
223
    def save(self):
224
        with open(self.filename, "wb") as fp:
225
            self.to_yaml(stream=fp)
226