Completed
Push — master ( eb677f...0fa067 )
by Fabio
04:22
created

benedict.utils.io_util.decode_bytes()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 16
rs 9.75
c 0
b 0
f 0
cc 2
nop 4
1
# -*- coding: utf-8 -*-
2
3
from six import binary_type, string_types
4
5
import base64
6
import errno
7
import json
8
import os
9
import re
10
import requests
11
import xmltodict
12
import toml
13
import yaml
14
15
try:
16
    # python 3
17
    from urllib.parse import unquote
18
    from urllib.parse import unquote_plus
19
    from urllib.parse import urlencode
20
    from urllib.parse import parse_qs
21
except ImportError:
22
    # python 2
23
    from urllib import unquote
24
    from urllib import unquote_plus
25
    from urllib import urlencode
26
    from urlparse import parse_qs
27
28
29
def decode_base64(s, **kwargs):
30
    # fix urlencoded chars
31
    s = unquote(s)
32
    # fix padding
33
    m = len(s) % 4
34
    if m != 0:
35
        s += '=' * (4 - m)
36
    data = base64.b64decode(s)
37
    format = kwargs.pop('format', None)
38
    encoding = kwargs.pop('encoding', 'utf-8' if format else None)
39
    if encoding:
40
        data = data.decode(encoding)
41
        if format:
42
            decoders = {
43
                'json': decode_json,
44
                'toml': decode_toml,
45
                'yaml': decode_yaml,
46
                'xml': decode_xml,
47
            }
48
            decode_func = decoders.get(format.lower(), '')
49
            if decode_func:
50
                data = decode_func(data, **kwargs)
51
    return data
52
53
54
def decode_json(s, **kwargs):
55
    data = json.loads(s, **kwargs)
56
    return data
57
58
59
def decode_query_string(s, **kwargs):
60
    flat = kwargs.pop('flat', True)
61
    qs_re = r'^(([\w\-\%\+]+\=[\w\-\%\+]*)+([\&]{1})?)+'
62
    qs_pattern = re.compile(qs_re)
63
    if qs_pattern.match(s):
64
        data = parse_qs(s)
65
        if flat:
66
            data = { key:value[0] for key, value in data.items() }
67
        return data
68
    else:
69
        raise ValueError('Invalid query string: {}'.format(s))
70
71
72
def decode_xml(s, **kwargs):
73
    kwargs.setdefault('dict_constructor', dict)
74
    data = xmltodict.parse(s, **kwargs)
75
    return data
76
77
78
def decode_toml(s, **kwargs):
79
    data = toml.loads(s, **kwargs)
80
    return data
81
82
83
def decode_yaml(s, **kwargs):
84
    kwargs.setdefault('Loader', yaml.Loader)
85
    data = yaml.load(s, **kwargs)
86
    return data
87
88
89
def encode_base64(d, **kwargs):
90
    data = d
91
    format = kwargs.pop('format', None)
92
    encoding = kwargs.pop('encoding', 'utf-8' if format else None)
93
    if not isinstance(data, string_types) and format:
94
        encoders = {
95
            'json': encode_json,
96
            'toml': encode_toml,
97
            'yaml': encode_yaml,
98
            'xml': encode_xml,
99
        }
100
        encode_func = encoders.get(format.lower(), '')
101
        if encode_func:
102
            data = encode_func(data, **kwargs)
103
    if isinstance(data, string_types) and encoding:
104
        data = data.encode(encoding)
105
    data = base64.b64encode(data)
106
    if isinstance(data, binary_type) and encoding:
107
        data = data.decode(encoding)
108
    return data
109
110
111
def encode_json(d, **kwargs):
112
    data = json.dumps(d, **kwargs)
113
    return data
114
115
116
def encode_query_string(d, **kwargs):
117
    data = urlencode(d, **kwargs)
118
    return data
119
120
121
def encode_toml(d, **kwargs):
122
    data = toml.dumps(d, **kwargs)
123
    return data
124
125
126
def encode_xml(d, **kwargs):
127
    data = xmltodict.unparse(d, **kwargs)
128
    return data
129
130
131
def encode_yaml(d, **kwargs):
132
    data = yaml.dump(d, **kwargs)
133
    return data
134
135
136
def read_content(s):
137
    # s -> filepath or url or data
138
    if s.startswith('http://') or s.startswith('https://'):
139
        content = read_url(s)
140
    elif os.path.isfile(s):
141
        content = read_file(s)
142
    else:
143
        content = s
144
    return content
145
146
147
def read_file(filepath):
148
    handler = open(filepath, 'r')
149
    content = handler.read()
150
    handler.close()
151
    return content
152
153
154
def read_url(url, *args, **kwargs):
155
    response = requests.get(url, *args, **kwargs)
156
    content = response.text
157
    return content
158
159
160
def write_file(filepath, content):
161
    # https://stackoverflow.com/questions/12517451/automatically-creating-directories-with-file-output
162
    filedir = os.path.dirname(filepath)
163
    if not os.path.exists(filedir):
164
        try:
165
            os.makedirs(filedir)
166
        except OSError as e:
167
            # Guard against race condition
168
            if e.errno != errno.EEXIST:
169
                raise e
170
    handler = open(filepath, 'w+')
171
    handler.write(content)
172
    handler.close()
173
    return True
174