Completed
Push — master ( 1c4121...07028b )
by Fabio
03:58
created

benedict.utils.io_util.decode_query_string()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 11
rs 9.9
c 0
b 0
f 0
cc 3
nop 2
1
# -*- coding: utf-8 -*-
2
3
from six import string_types
4
5
import base64
6
import errno
7
import json
8
import os
9
import re
10
import requests
11
import xmltodict
12
import toml
13
import yaml
14
15
try:
16
    # python 3
17
    from urllib.parse import unquote_plus as urldecode
18
    from urllib.parse import urlencode
19
    from urllib.parse import parse_qs
20
except ImportError:
21
    # python 2
22
    from urllib import unquote_plus as urldecode
23
    from urllib import urlencode
24
    from urlparse import parse_qs
25
26
27
def decode_base64(s, **kwargs):
28
    # fix urlencoded chars
29
    s = urldecode(s)
30
    # fix padding
31
    m = len(s) % 4
32
    if m != 0:
33
        s += '=' * (4 - m)
34
    encoding = kwargs.pop('encoding', 'utf-8')
35
    data = base64.b64decode(s).decode(encoding)
36
    # decode data if format is specified
37
    decode_format = kwargs.pop('format', 'json')
38
    if decode_format:
39
        decoders = {
40
            'json': decode_json,
41
            'toml': decode_toml,
42
            'yaml': decode_yaml,
43
            'xml': decode_xml,
44
        }
45
        decode_func = decoders.get(decode_format.lower(), None)
46
        if decode_func:
47
            data = decode_func(data, **kwargs)
48
    return data
49
50
51
def decode_json(s, **kwargs):
52
    data = json.loads(s, **kwargs)
53
    return data
54
55
56
def decode_query_string(s, **kwargs):
57
    flat = kwargs.pop('flat', True)
58
    qs_re = r'^(([\w\-\%\+]+\=[\w\-\%\+]*)+([\&]{1})?)+'
59
    qs_pattern = re.compile(qs_re)
60
    if qs_pattern.match(s):
61
        data = parse_qs(s)
62
        if flat:
63
            data = { key:value[0] for key, value in data.items() }
64
        return data
65
    else:
66
        raise ValueError('Invalid query string: {}'.format(s))
67
68
69
def decode_xml(s, **kwargs):
70
    kwargs.setdefault('dict_constructor', dict)
71
    data = xmltodict.parse(s, **kwargs)
72
    return data
73
74
75
def decode_toml(s, **kwargs):
76
    data = toml.loads(s, **kwargs)
77
    return data
78
79
80
def decode_yaml(s, **kwargs):
81
    kwargs.setdefault('Loader', yaml.Loader)
82
    data = yaml.load(s, **kwargs)
83
    return data
84
85
86
def encode_base64(d, **kwargs):
87
    encoding = kwargs.pop('encoding', 'utf-8')
88
    encode_format = kwargs.pop('format', 'json')
89
    if encode_format:
90
        encoders = {
91
            'json': encode_json,
92
            'toml': encode_toml,
93
            'yaml': encode_yaml,
94
            'xml': encode_xml,
95
        }
96
        encode_func = encoders.get(encode_format.lower(), None)
97
        if encode_func:
98
            data = encode_func(d, **kwargs)
99
    if isinstance(data, string_types):
0 ignored issues
show
introduced by
The variable data does not seem to be defined for all execution paths.
Loading history...
100
        data = data.encode(encoding)
101
    data = base64.b64encode(data).decode(encoding)
102
    return data
103
104
105
def encode_json(d, **kwargs):
106
    data = json.dumps(d, **kwargs)
107
    return data
108
109
110
def encode_query_string(d, **kwargs):
111
    data = urlencode(d, **kwargs)
112
    return data
113
114
115
def encode_toml(d, **kwargs):
116
    data = toml.dumps(d, **kwargs)
117
    return data
118
119
120
def encode_xml(d, **kwargs):
121
    data = xmltodict.unparse(d, **kwargs)
122
    return data
123
124
125
def encode_yaml(d, **kwargs):
126
    data = yaml.dump(d, **kwargs)
127
    return data
128
129
130
def read_content(s):
131
    # s -> filepath or url or data
132
    if s.startswith('http://') or s.startswith('https://'):
133
        content = read_url(s)
134
    elif os.path.isfile(s):
135
        content = read_file(s)
136
    else:
137
        content = s
138
    return content
139
140
141
def read_file(filepath):
142
    handler = open(filepath, 'r')
143
    content = handler.read()
144
    handler.close()
145
    return content
146
147
148
def read_url(url, *args, **kwargs):
149
    response = requests.get(url, *args, **kwargs)
150
    content = response.text
151
    return content
152
153
154
def write_file(filepath, content):
155
    # https://stackoverflow.com/questions/12517451/automatically-creating-directories-with-file-output
156
    if not os.path.exists(os.path.dirname(filepath)):
157
        try:
158
            os.makedirs(os.path.dirname(filepath))
159
        except OSError as e:
160
            # Guard against race condition
161
            if e.errno != errno.EEXIST:
162
                raise e
163
    handler = open(filepath, 'w+')
164
    handler.write(content)
165
    handler.close()
166
    return True
167