load_file_configs()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 14
dl 0
loc 32
rs 2.7581
c 1
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A _get_st_mode() 0 6 2
A _file_yaml_loader() 0 3 2
A _dir_yaml_loader() 0 5 4

How to fix   Complexity   

Complexity

Complex classes like load_file_configs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import print_function, absolute_import, unicode_literals
2
3
import collections
4
import logging
5
import os
6
import stat
7
import warnings
8
import itertools
9
10
from os.path import exists, join, dirname, isfile, isdir, abspath, expanduser
11
from string import Template
12
13
try:
14
    from urllib import quote_plus
15
except ImportError:
16
    from urllib.parse import quote_plus
17
18
from binstar_client.utils.conda import CONDA_PREFIX, CONDA_ROOT
19
from binstar_client.utils.appdirs import AppDirs, EnvAppDirs
20
from binstar_client.errors import BinstarError
21
22
from .yaml import yaml_load, yaml_dump
23
24
25
logger = logging.getLogger('binstar')
26
27
28
def expandvars(path):
29
    environ = dict(CONDA_ROOT=CONDA_ROOT, CONDA_PREFIX=CONDA_PREFIX)
30
    environ.update(os.environ)
31
    return Template(path).safe_substitute(**environ)
32
33
34
def expand(path):
35
    return abspath(expanduser(expandvars(path)))
36
37
38
if 'BINSTAR_CONFIG_DIR' in os.environ:
39
    dirs = EnvAppDirs('binstar', 'ContinuumIO', os.environ['BINSTAR_CONFIG_DIR'])
40
    USER_CONFIG = join(dirs.user_data_dir, 'config.yaml')
41
else:
42
    dirs = AppDirs('binstar', 'ContinuumIO')
43
    USER_CONFIG = expand('~/.continuum/anaconda-client/config.yaml')
44
45
46
USER_LOGDIR = dirs.user_log_dir
47
SITE_CONFIG = expand('$CONDA_ROOT/etc/anaconda-client/config.yaml')
48
SYSTEM_CONFIG = SITE_CONFIG
49
50
DEFAULT_URL = 'https://api.anaconda.org'
51
DEFAULT_CONFIG = {
52
    'sites': {
53
        'anaconda': {'url': DEFAULT_URL},
54
        'binstar': {'url': DEFAULT_URL},
55
    },
56
    'auto_register': True,
57
    'default_site': None,
58
    'url': DEFAULT_URL,
59
    'ssl_verify': True
60
}
61
62
CONFIGURATION_KEYS = [
63
    'auto_register',
64
    'default_site',
65
    'upload_user',
66
    'sites',
67
    'url',
68
    'verify_ssl',
69
    'ssl_verify',
70
]
71
72
SEARCH_PATH = (
73
    dirs.site_data_dir,
74
    '/etc/anaconda-client/',
75
    '$CONDA_ROOT/etc/anaconda-client/',
76
    dirs.user_data_dir,
77
    '~/.continuum/anaconda-client/',
78
    '$CONDA_PREFIX/etc/anaconda-client/',
79
)
80
81
82
def recursive_update(config, update_dict):
83
    for update_key, updated_value in update_dict.items():
84
        if isinstance(updated_value, collections.Mapping):
85
            updated_value_dict = recursive_update(config.get(update_key, {}), updated_value)
86
            config[update_key] = updated_value_dict
87
        else:
88
            config[update_key] = update_dict[update_key]
89
90
    return config
91
92
93
def get_server_api(token=None, site=None, cls=None, config=None, **kwargs):
94
    """
95
    Get the anaconda server api class
96
    """
97
    if not cls:
98
        from binstar_client import Binstar
99
        cls = Binstar
100
101
    config = config if config is not None else get_config(site=site)
102
103
    url = config.get('url', DEFAULT_URL)
104
105
    logger.info("Using Anaconda API: %s", url)
106
107
    if token:
108
        logger.debug("Using token from command line args")
109
    elif 'BINSTAR_API_TOKEN' in os.environ:
110
        logger.debug("Using token from environment variable BINSTAR_API_TOKEN")
111
        token = os.environ['BINSTAR_API_TOKEN']
112
    elif 'ANACONDA_API_TOKEN' in os.environ:
113
        logger.debug("Using token from environment variable ANACONDA_API_TOKEN")
114
        token = os.environ['ANACONDA_API_TOKEN']
115
    else:
116
        token = load_token(url)
117
118
    verify = config.get('ssl_verify', config.get('verify_ssl', True))
119
120
    return cls(token, domain=url, verify=verify, **kwargs)
121
122
123
def get_binstar(args=None, cls=None):
124
    """
125
    DEPRECATED METHOD,
126
127
    use `get_server_api`
128
    """
129
130
    warnings.warn(
131
        'method get_binstar is deprecated, please use `get_server_api`',
132
        DeprecationWarning
133
    )
134
135
    token = getattr(args, 'token', None)
136
    log_level = getattr(args, 'log_level', logging.INFO)
137
    site = getattr(args, 'site', None)
138
139
    aserver_api = get_server_api(token=token, site=site, log_level=log_level, cls=cls)
140
    return aserver_api
141
142
143
TOKEN_DIRS = [
144
    dirs.user_data_dir,
145
    join(dirname(USER_CONFIG), 'tokens'),
146
]
147
TOKEN_DIR = TOKEN_DIRS[-1]
148
149
150
def store_token(token, args):
151
    config = get_config(site=args and args.site)
152
153
    for token_dir in TOKEN_DIRS:
154
        url = config.get('url', DEFAULT_URL)
155
156
        if not isdir(token_dir):
157
            os.makedirs(token_dir)
158
        tokenfile = join(token_dir, '%s.token' % quote_plus(url))
159
160
        if isfile(tokenfile):
161
            os.unlink(tokenfile)
162
        with open(tokenfile, 'w') as fd:
163
            fd.write(token)
164
        os.chmod(tokenfile, stat.S_IWRITE | stat.S_IREAD)
165
166
167
def load_token(url):
168
    for token_dir in TOKEN_DIRS:
169
        tokenfile = join(token_dir, '%s.token' % quote_plus(url))
170
171
        if isfile(tokenfile):
172
            logger.debug("Found login token: {}".format(tokenfile))
173
            with open(tokenfile) as fd:
174
                token = fd.read().strip()
175
176
            if token:
177
                return token
178
            else:
179
                logger.debug("Token file is empty: {}".format(tokenfile))
180
                logger.debug("Removing file: {}".format(tokenfile))
181
                os.unlink(tokenfile)
182
183
184
def remove_token(args):
185
    config = get_config(site=args and args.site)
186
    url = config.get('url', DEFAULT_URL)
187
188
    for token_dir in TOKEN_DIRS:
189
        tokenfile = join(token_dir, '%s.token' % quote_plus(url))
190
        if isfile(tokenfile):
191
            os.unlink(tokenfile)
192
193
194
def load_config(config_file):
195
    if exists(config_file):
196
        with open(config_file) as fd:
197
            data = yaml_load(fd)
198
            if data:
199
                return data
200
201
    return {}
202
203
204
def load_file_configs(search_path):
205
    def _file_yaml_loader(fullpath):
206
        assert fullpath.endswith(".yml") or fullpath.endswith(".yaml") or fullpath.endswith("anacondarc"), fullpath
207
        yield fullpath, load_config(fullpath)
208
209
    def _dir_yaml_loader(fullpath):
210
        for filename in os.listdir(fullpath):
211
            if filename.endswith(".yml") or filename.endswith(".yaml"):
212
                filepath = join(fullpath, filename)
213
                yield filepath, load_config(filepath)
214
215
    # map a stat result to a file loader or a directory loader
216
    _loader = {
217
        stat.S_IFREG: _file_yaml_loader,
218
        stat.S_IFDIR: _dir_yaml_loader,
219
    }
220
221
    def _get_st_mode(path):
222
        # stat the path for file type, or None if path doesn't exist
223
        try:
224
            return stat.S_IFMT(os.stat(path).st_mode)
225
        except OSError:
226
            return None
227
228
    expanded_paths = [expand(path) for path in search_path]
229
    stat_paths = (_get_st_mode(path) for path in expanded_paths)
230
    load_paths = (_loader[st_mode](path)
231
                  for path, st_mode in zip(expanded_paths, stat_paths)
232
                  if st_mode is not None)
233
    raw_data = collections.OrderedDict(kv for kv in itertools.chain.from_iterable(load_paths))
234
235
    return raw_data
236
237
238
def get_config(site=None):
239
    config = DEFAULT_CONFIG.copy()
240
241
    file_configs = load_file_configs(SEARCH_PATH)
242
    for fn in file_configs:
243
        recursive_update(config, file_configs[fn])
244
245
    site = site or config.get('default_site')
246
    sites = config.get('sites', {})
247
248
    if site:
249
        site = str(site)
250
251
        if site not in sites:
252
            logger.warning('Site alias "%s" does not exist in the config file', site)
253
        else:
254
            # This takes whatever keys are set for the site into the top level of the config dict
255
            recursive_update(config, sites.get(site, {}))
256
257
    return config
258
259
260
def save_config(data, config_file):
261
    data_dir = dirname(config_file)
262
263
    try:
264
        if not exists(data_dir):
265
            os.makedirs(data_dir)
266
267
        with open(config_file, 'w') as fd:
268
            yaml_dump(data, stream=fd)
269
    except EnvironmentError as exc:
270
        raise BinstarError('%s: %s' % (exc.filename, exc.strerror,))
271
272
273
def set_config(data, user=True):
274
    warnings.warn('Use save_config instead of set_config', DeprecationWarning)
275
    save_config(data, USER_CONFIG if user else SYSTEM_CONFIG)
276