benedict.dicts.io.io_util   A
last analyzed

Complexity

Total Complexity 29

Size/Duplication

Total Lines 150
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 108
dl 0
loc 150
rs 10
c 0
b 0
f 0
wmc 29

16 Functions

Rating   Name   Duplication   Size   Complexity  
A write_content_to_s3() 0 10 1
A encode() 0 10 3
A read_content_from_url() 0 7 2
A read_content() 0 17 5
A is_url() 0 2 1
A is_data() 0 2 1
A write_content() 0 5 2
A write_content_to_file() 0 2 1
A parse_s3_url() 0 11 2
A decode() 0 11 3
A read_content_from_file() 0 5 2
A is_s3() 0 2 1
A read_content_from_s3() 0 10 1
A is_binary_format() 0 5 1
A autodetect_format() 0 5 2
A is_filepath() 0 2 1
1
import tempfile
2
3
# from botocore.exceptions import ClientError
4
from urllib.parse import urlparse
5
6
import boto3
7
import fsutil
8
9
from benedict.serializers import get_format_by_path, get_serializer_by_format
10
11
12
def autodetect_format(s):
13
    s = str(s)
14
    if any([is_url(s), is_s3(s), is_filepath(s)]):
15
        return get_format_by_path(s)
16
    return None
17
18
19
def decode(s, format, **kwargs):
20
    s = str(s)
21
    serializer = get_serializer_by_format(format)
22
    if not serializer:
23
        raise ValueError(f"Invalid format: {format}.")
24
    options = kwargs.copy()
25
    if format in ["b64", "base64"]:
26
        options.setdefault("subformat", "json")
27
    content = read_content(s, format, options)
28
    data = serializer.decode(content, **options)
29
    return data
30
31
32
def encode(d, format, filepath=None, **kwargs):
33
    serializer = get_serializer_by_format(format)
34
    if not serializer:
35
        raise ValueError(f"Invalid format: {format}.")
36
    options = kwargs.copy()
37
    content = serializer.encode(d, **options)
38
    if filepath:
39
        filepath = str(filepath)
40
        write_content(filepath, content, **options)
41
    return content
42
43
44
def is_binary_format(format):
45
    return format in [
46
        "xls",
47
        "xlsx",
48
        "xlsm",
49
    ]
50
51
52
def is_data(s):
53
    return len(s.splitlines()) > 1
54
55
56
def is_filepath(s):
57
    return fsutil.is_file(s) or get_format_by_path(s)
58
59
60
def is_s3(s):
61
    return s.startswith("s3://") and get_format_by_path(s)
62
63
64
def is_url(s):
65
    return any([s.startswith(protocol) for protocol in ["http://", "https://"]])
66
67
68
def parse_s3_url(url):
69
    parsed = urlparse(url, allow_fragments=False)
70
    bucket = parsed.netloc
71
    key = parsed.path.lstrip("/")
72
    if parsed.query:
73
        key += "?" + parsed.query
74
    url = parsed.geturl()
75
    return {
76
        "url": url,
77
        "bucket": bucket,
78
        "key": key,
79
    }
80
81
82
def read_content(s, format=None, options=None):
83
    # s -> filepath or url or data
84
    # options.setdefault("format", format)
85
    options = options or {}
86
    s = s.strip()
87
    if is_data(s):
88
        return s
89
    elif is_url(s):
90
        requests_options = options.pop("requests_options", None) or {}
91
        return read_content_from_url(s, requests_options, format)
92
    elif is_s3(s):
93
        s3_options = options.pop("s3_options", None) or {}
94
        return read_content_from_s3(s, s3_options, format)
95
    elif is_filepath(s):
96
        return read_content_from_file(s, format)
97
    # one-line data?!
98
    return s
99
100
101
def read_content_from_file(filepath, format=None):
102
    binary_format = is_binary_format(format)
103
    if binary_format:
104
        return filepath
105
    return fsutil.read_file(filepath)
106
107
108
def read_content_from_s3(url, s3_options, format=None):
109
    s3_url = parse_s3_url(url)
110
    dirpath = tempfile.gettempdir()
111
    filename = fsutil.get_filename(s3_url["key"])
112
    filepath = fsutil.join_path(dirpath, filename)
113
    s3 = boto3.client("s3", **s3_options)
114
    s3.download_file(s3_url["bucket"], s3_url["key"], filepath)
115
    s3.close()
116
    content = read_content_from_file(filepath, format)
117
    return content
118
119
120
def read_content_from_url(url, requests_options, format=None):
121
    binary_format = is_binary_format(format)
122
    if binary_format:
123
        dirpath = tempfile.gettempdir()
124
        filepath = fsutil.download_file(url, dirpath, **requests_options)
125
        return filepath
126
    return fsutil.read_file_from_url(url, **requests_options)
127
128
129
def write_content(filepath, content, **options):
130
    if is_s3(filepath):
131
        write_content_to_s3(filepath, content, **options)
132
    else:
133
        write_content_to_file(filepath, content, **options)
134
135
136
def write_content_to_file(filepath, content, **options):
137
    fsutil.write_file(filepath, content)
138
139
140
def write_content_to_s3(url, content, s3_options, **options):
141
    s3_url = parse_s3_url(url)
142
    dirpath = tempfile.gettempdir()
143
    filename = fsutil.get_filename(s3_url["key"])
144
    filepath = fsutil.join_path(dirpath, filename)
145
    fsutil.write_file(filepath, content)
146
    s3 = boto3.client("s3", **s3_options)
147
    s3.upload_file(filepath, s3_url["bucket"], s3_url["key"])
148
    s3.close()
149
    fsutil.remove_file(filepath)
150