Passed
Push — master ( 07028b...eb677f )
by Fabio
01:33
created

benedict.utils.io_util.decode_bytes()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 16
rs 9.75
c 0
b 0
f 0
cc 2
nop 4
1
# -*- coding: utf-8 -*-
2
3
from six import string_types
4
5
import base64
6
import errno
7
import json
8
import os
9
import re
10
import requests
11
import xmltodict
12
import toml
13
import yaml
14
15
try:
16
    # python 3
17
    from urllib.parse import unquote
18
    from urllib.parse import unquote_plus
19
    from urllib.parse import urlencode
20
    from urllib.parse import parse_qs
21
except ImportError:
22
    # python 2
23
    from urllib import unquote
24
    from urllib import unquote_plus
25
    from urllib import urlencode
26
    from urlparse import parse_qs
27
28
29
def decode_base64(s, **kwargs):
30
    # fix urlencoded chars
31
    s = unquote(s)
32
    # fix padding
33
    m = len(s) % 4
34
    if m != 0:
35
        s += '=' * (4 - m)
36
    b = base64.b64decode(s)
37
    encoding = kwargs.pop('encoding', 'utf-8')
38
    format = kwargs.pop('format', None)
39
    if encoding:
40
        data = decode_bytes(b, encoding=encoding, format=format)
41
    else:
42
        data = b
43
    return data
44
45
46
def decode_bytes(b, encoding='utf-8', format=None, **kwargs):
47
    s = b.decode(encoding)
48
    # fix trailing null chars
49
    s = s.strip().strip('\x00').strip()
50
    if format:
51
        decoders = {
52
            'json': decode_json,
53
            'toml': decode_toml,
54
            'yaml': decode_yaml,
55
            'xml': decode_xml,
56
        }
57
        decoder = decoders[format]
58
        data = decoder(s, **kwargs)
59
    else:
60
        data = s
61
    return data
62
63
64
def decode_json(s, **kwargs):
65
    data = json.loads(s, **kwargs)
66
    return data
67
68
69
def decode_query_string(s, **kwargs):
70
    flat = kwargs.pop('flat', True)
71
    qs_re = r'^(([\w\-\%\+]+\=[\w\-\%\+]*)+([\&]{1})?)+'
72
    qs_pattern = re.compile(qs_re)
73
    if qs_pattern.match(s):
74
        data = parse_qs(s)
75
        if flat:
76
            data = { key:value[0] for key, value in data.items() }
77
        return data
78
    else:
79
        raise ValueError('Invalid query string: {}'.format(s))
80
81
82
def decode_xml(s, **kwargs):
83
    kwargs.setdefault('dict_constructor', dict)
84
    data = xmltodict.parse(s, **kwargs)
85
    return data
86
87
88
def decode_toml(s, **kwargs):
89
    data = toml.loads(s, **kwargs)
90
    return data
91
92
93
def decode_yaml(s, **kwargs):
94
    kwargs.setdefault('Loader', yaml.Loader)
95
    data = yaml.load(s, **kwargs)
96
    return data
97
98
99
def encode_base64(d, **kwargs):
100
    encoding = kwargs.pop('encoding', 'utf-8')
101
    encode_format = kwargs.pop('format', 'json')
102
    if encode_format:
103
        encoders = {
104
            'json': encode_json,
105
            'toml': encode_toml,
106
            'yaml': encode_yaml,
107
            'xml': encode_xml,
108
        }
109
        encode_func = encoders.get(encode_format.lower(), None)
110
        if encode_func:
111
            data = encode_func(d, **kwargs)
112
    if isinstance(data, string_types):
0 ignored issues
show
introduced by
The variable data does not seem to be defined for all execution paths.
Loading history...
113
        data = data.encode(encoding)
114
    data = base64.b64encode(data).decode(encoding)
115
    return data
116
117
118
def encode_json(d, **kwargs):
119
    data = json.dumps(d, **kwargs)
120
    return data
121
122
123
def encode_query_string(d, **kwargs):
124
    data = urlencode(d, **kwargs)
125
    return data
126
127
128
def encode_toml(d, **kwargs):
129
    data = toml.dumps(d, **kwargs)
130
    return data
131
132
133
def encode_xml(d, **kwargs):
134
    data = xmltodict.unparse(d, **kwargs)
135
    return data
136
137
138
def encode_yaml(d, **kwargs):
139
    data = yaml.dump(d, **kwargs)
140
    return data
141
142
143
def read_content(s):
144
    # s -> filepath or url or data
145
    if s.startswith('http://') or s.startswith('https://'):
146
        content = read_url(s)
147
    elif os.path.isfile(s):
148
        content = read_file(s)
149
    else:
150
        content = s
151
    return content
152
153
154
def read_file(filepath):
155
    handler = open(filepath, 'r')
156
    content = handler.read()
157
    handler.close()
158
    return content
159
160
161
def read_url(url, *args, **kwargs):
162
    response = requests.get(url, *args, **kwargs)
163
    content = response.text
164
    return content
165
166
167
def write_file(filepath, content):
168
    # https://stackoverflow.com/questions/12517451/automatically-creating-directories-with-file-output
169
    filedir = os.path.dirname(filepath)
170
    if not os.path.exists(filedir):
171
        try:
172
            os.makedirs(filedir)
173
        except OSError as e:
174
            # Guard against race condition
175
            if e.errno != errno.EEXIST:
176
                raise e
177
    handler = open(filepath, 'w+')
178
    handler.write(content)
179
    handler.close()
180
    return True
181