Test Failed
Push — master ( db4166...efa4d0 )
by Thomas
11:36
created

exabgp.environment.parsing.syslog_name()   A

Complexity

Conditions 5

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 9
nop 1
dl 0
loc 9
rs 9.3333
c 0
b 0
f 0
1
"""
2
environment.py
3
4
Created by Thomas Mangin on 2020-05-14.
5
Copyright (c) 2011-2017 Exa Networks. All rights reserved.
6
License: 3-clause BSD. (See the COPYRIGHT file)
7
"""
8
9
import os
10
import sys
11
import pwd
12
13
import syslog
14
15
from exabgp.util.ip import isip
16
from exabgp.protocol.ip import IP
17
18
from exabgp.environment import base
19
20
21
log_levels = ['EMERG', 'ALERT', 'CRIT', 'CRITICAL', 'ERR', 'ERROR', 'WARNING', 'NOTICE', 'INFO', 'DEBUG']
22
23
24
def integer(_):
25
    return int(_)
26
27
28
def real(_):
29
    return float(_)
30
31
32
def lowunquote(_):
33
    return _.strip().strip('\'"').lower()
34
35
36
def unquote(_):
37
    return _.strip().strip('\'"')
38
39
40
def quote(_):
41
    return "'%s'" % str(_)
42
43
44
def quote_list(_):
45
    return "'%s'" % ' '.join([str(x) for x in _])
46
47
48
def nop(_):
49
    return _
50
51
52
def boolean(_):
53
    return _.lower() in ('1', 'yes', 'on', 'enable', 'true')
54
55
56
def api(_):
57
    encoder = _.lower()
58
    if encoder not in ('text', 'json'):
59
        raise TypeError('invalid encoder')
60
    return encoder
61
62
63
def methods(_):
64
    return _.upper().split()
65
66
67
def list(_):
68
    return "'%s'" % ' '.join(_)
69
70
71
def lower(_):
72
    return str(_).lower()
73
74
75
def ip(_):
76
    if isip(_):
77
        return _
78
    raise TypeError('ip %s is invalid' % _)
79
80
81
def ip_list(_):
82
    ips = []
83
    for ip in _.split(' '):
84
        if not ip:
85
            continue
86
        elif isip(ip):
87
            ips.append(IP.create(ip))
88
        else:
89
            raise TypeError('ip %s is invalid' % ip)
90
    return ips
91
92
93
def user(_):
94
    # XXX: incomplete
95
    try:
96
        pwd.getpwnam(_)
97
        # uid = answer[2]
98
    except KeyError:
99
        raise TypeError('user %s is not found on this system' % _)
100
    return _
101
102
103
def folder(path):
104
    paths = root(path)
105
    options = [p for p in paths if os.path.exists(path)]
106
    if not options:
107
        raise TypeError('%s does not exists' % path)
108
    first = options[0]
109
    if not first:
110
        raise TypeError('%s does not exists' % first)
111
    return first
112
113
114
def path(path):
115
    split = sys.argv[0].split('lib/exabgp')
116
    if len(split) > 1:
117
        prefix = os.sep.join(split[:1])
118
        if prefix and path.startswith(prefix):
119
            path = path[len(prefix) :]
120
    home = os.path.expanduser('~')
121
    if path.startswith(home):
122
        return "'~%s'" % path[len(home) :]
123
    return "'%s'" % path
124
125
126
def conf(path):
127
    first = folder(path)
128
    if not os.path.isfile(first):
129
        raise TypeError('%s is not a file' % path)
130
    return first
131
132
133
def exe(path):
134
    first = conf(path)
135
    if not os.access(first, os.X_OK):
136
        raise TypeError('%s is not an executable' % first)
137
    return first
138
139
140
# def syslog(path):
141
#     path = unquote(path)
142
#     if path in ('stdout', 'stderr'):
143
#         return path
144
#     if path.startswith('host:'):
145
#         return path
146
#     return path
147
148
149
def umask_read(_):
150
    return int(_, 8)
151
152
153
def umask_write(_):
154
    return "'%s'" % (oct(_))
155
156
157
def syslog_value(log):
158
    if log not in log_levels:
159
        if log == 'CRITICAL':
160
            log = 'CRIT'
161
        if log == 'ERROR':
162
            log = 'ERR'
163
        raise TypeError('invalid log level %s' % log)
164
    return getattr(syslog, 'LOG_%s' % log)
165
166
167
def syslog_name(log):
168
    for name in log_levels:
169
        if name == 'CRITICAL':
170
            name = 'CRIT'
171
        if name == 'ERROR':
172
            name = 'ERR'
173
        if getattr(syslog, 'LOG_%s' % name) == log:
174
            return name
175
    raise TypeError('invalid log level %s' % log)
176
177
178
def root(path):
179
    roots = base.root.split(os.sep)
180
    location = []
181
    for index in range(len(roots) - 1, -1, -1):
182
        if roots[index] == 'lib':
183
            if index:
184
                location = roots[:index]
185
            break
186
    root = os.path.join(*location)
187
    paths = [
188
        os.path.normpath(os.path.join(os.path.join(os.sep, root, path))),
189
        os.path.normpath(os.path.expanduser(unquote(path))),
190
        os.path.normpath(os.path.join('/', path)),
191
    ]
192
    return paths
193