Passed
Push — master ( a1e5b9...5ac299 )
by Fabio
01:25
created

benedict.dicts.io.io_util.write_file()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
# -*- coding: utf-8 -*-
2
3
from benedict.serializers import (
4
    get_format_by_path,
5
    get_serializer_by_format,
6
)
7
8
# from botocore.exceptions import ClientError
9
from urllib.parse import urlparse
10
11
import boto3
12
import fsutil
13
import tempfile
14
15
16
def autodetect_format(s):
17
    if any([is_url(s), is_s3(s), is_filepath(s)]):
18
        return get_format_by_path(s)
19
    return None
20
21
22
def decode(s, format, **kwargs):
23
    serializer = get_serializer_by_format(format)
24
    if not serializer:
25
        raise ValueError(f"Invalid format: {format}.")
26
    options = kwargs.copy()
27
    if format in ["b64", "base64"]:
28
        options.setdefault("subformat", "json")
29
    content = read_content(s, format, **options)
30
    data = serializer.decode(content, **options)
31
    return data
32
33
34
def encode(d, format, filepath=None, **kwargs):
35
    serializer = get_serializer_by_format(format)
36
    if not serializer:
37
        raise ValueError(f"Invalid format: {format}.")
38
    options = kwargs.copy()
39
    content = serializer.encode(d, **options)
40
    if filepath:
41
        write_content(filepath, content, **options)
42
    return content
43
44
45
def is_binary_format(format):
46
    return format in [
47
        "xls",
48
        "xlsx",
49
        "xlsm",
50
    ]
51
52
53
def is_data(s):
54
    return len(s.splitlines()) > 1
55
56
57
def is_filepath(s):
58
    return fsutil.is_file(s) or get_format_by_path(s)
59
60
61
def is_s3(s):
62
    return s.startswith("s3://") and get_format_by_path(s)
63
64
65
def is_url(s):
66
    return any([s.startswith(protocol) for protocol in ["http://", "https://"]])
67
68
69
def parse_s3_url(url):
70
    parsed = urlparse(url, allow_fragments=False)
71
    bucket = parsed.netloc
72
    key = parsed.path.lstrip("/")
73
    if parsed.query:
74
        key += "?" + self._parsed.query
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
75
    url = parsed.geturl()
76
    return {
77
        "url": url,
78
        "bucket": bucket,
79
        "key": key,
80
    }
81
82
83
def read_content(s, format=None, **options):
84
    # s -> filepath or url or data
85
    options.setdefault("format", format)
86
    s = s.strip()
87
    if is_data(s):
88
        return s
89
    elif is_url(s):
90
        return read_content_from_url(s, **options)
91
    elif is_s3(s):
92
        return read_content_from_s3(s, **options)
93
    elif is_filepath(s):
94
        return read_content_from_file(s, **options)
95
    # one-line data?!
96
    return s
97
98
99
def read_content_from_file(filepath, format=None, **options):
100
    binary_format = is_binary_format(format)
101
    if binary_format:
102
        return filepath
103
    return fsutil.read_file(filepath)
104
105
106
def read_content_from_s3(url, s3_options, format=None, **options):
107
    s3_url = parse_s3_url(url)
108
    dirpath = tempfile.gettempdir()
109
    filename = fsutil.get_filename(s3_url["key"])
110
    filepath = fsutil.join_path(dirpath, filename)
111
    s3 = boto3.client("s3", **s3_options)
112
    s3.download_file(s3_url["bucket"], s3_url["key"], filepath)
113
    s3.close()
114
    content = read_content_from_file(filepath, format, **options)
115
    return content
116
117
118
def read_content_from_url(url, requests_options=None, format=None, **options):
119
    requests_options = requests_options or {}
120
    binary_format = is_binary_format(format)
121
    if binary_format:
122
        dirpath = tempfile.gettempdir()
123
        filepath = fsutil.download_file(url, dirpath, **requests_options)
124
        return filepath
125
    return fsutil.read_file_from_url(url, **requests_options)
126
127
128
def write_content(filepath, content, **options):
129
    if is_s3(filepath):
130
        write_content_to_s3(filepath, content, **options)
131
    else:
132
        write_content_to_file(filepath, content, **options)
133
134
135
def write_content_to_file(filepath, content, **options):
136
    fsutil.write_file(filepath, content)
137
138
139
def write_content_to_s3(url, content, s3_options, **options):
140
    s3_url = parse_s3_url(url)
141
    dirpath = tempfile.gettempdir()
142
    filename = fsutil.get_filename(s3_url["key"])
143
    filepath = fsutil.join_path(dirpath, filename)
144
    fsutil.write_file(filepath, content)
145
    s3 = boto3.client("s3", **s3_options)
146
    s3.upload_file(filepath, s3_url["bucket"], s3_url["key"])
147
    s3.close()
148
    fsutil.remove_file(filepath)
149