fabiocaccamo /
python-benedict
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | |||
| 3 | from benedict.serializers import ( |
||
| 4 | get_format_by_path, |
||
| 5 | get_serializer_by_format, |
||
| 6 | ) |
||
| 7 | |||
| 8 | # from botocore.exceptions import ClientError |
||
| 9 | from urllib.parse import urlparse |
||
| 10 | |||
| 11 | import boto3 |
||
| 12 | import fsutil |
||
| 13 | import tempfile |
||
| 14 | |||
| 15 | |||
| 16 | def autodetect_format(s): |
||
| 17 | if any([is_url(s), is_s3(s), is_filepath(s)]): |
||
| 18 | return get_format_by_path(s) |
||
| 19 | return None |
||
| 20 | |||
| 21 | |||
| 22 | def decode(s, format, **kwargs): |
||
| 23 | serializer = get_serializer_by_format(format) |
||
| 24 | if not serializer: |
||
| 25 | raise ValueError(f"Invalid format: {format}.") |
||
| 26 | options = kwargs.copy() |
||
| 27 | if format in ["b64", "base64"]: |
||
| 28 | options.setdefault("subformat", "json") |
||
| 29 | content = read_content(s, format, **options) |
||
| 30 | data = serializer.decode(content, **options) |
||
| 31 | return data |
||
| 32 | |||
| 33 | |||
| 34 | def encode(d, format, filepath=None, **kwargs): |
||
| 35 | serializer = get_serializer_by_format(format) |
||
| 36 | if not serializer: |
||
| 37 | raise ValueError(f"Invalid format: {format}.") |
||
| 38 | options = kwargs.copy() |
||
| 39 | content = serializer.encode(d, **options) |
||
| 40 | if filepath: |
||
| 41 | write_content(filepath, content, **options) |
||
| 42 | return content |
||
| 43 | |||
| 44 | |||
| 45 | def is_binary_format(format): |
||
| 46 | return format in [ |
||
| 47 | "xls", |
||
| 48 | "xlsx", |
||
| 49 | "xlsm", |
||
| 50 | ] |
||
| 51 | |||
| 52 | |||
| 53 | def is_data(s): |
||
| 54 | return len(s.splitlines()) > 1 |
||
| 55 | |||
| 56 | |||
| 57 | def is_filepath(s): |
||
| 58 | return fsutil.is_file(s) or get_format_by_path(s) |
||
| 59 | |||
| 60 | |||
| 61 | def is_s3(s): |
||
| 62 | return s.startswith("s3://") and get_format_by_path(s) |
||
| 63 | |||
| 64 | |||
| 65 | def is_url(s): |
||
| 66 | return any([s.startswith(protocol) for protocol in ["http://", "https://"]]) |
||
| 67 | |||
| 68 | |||
| 69 | def parse_s3_url(url): |
||
| 70 | parsed = urlparse(url, allow_fragments=False) |
||
| 71 | bucket = parsed.netloc |
||
| 72 | key = parsed.path.lstrip("/") |
||
| 73 | if parsed.query: |
||
| 74 | key += "?" + self._parsed.query |
||
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
| 75 | url = parsed.geturl() |
||
| 76 | return { |
||
| 77 | "url": url, |
||
| 78 | "bucket": bucket, |
||
| 79 | "key": key, |
||
| 80 | } |
||
| 81 | |||
| 82 | |||
| 83 | def read_content(s, format=None, **options): |
||
| 84 | # s -> filepath or url or data |
||
| 85 | options.setdefault("format", format) |
||
| 86 | s = s.strip() |
||
| 87 | if is_data(s): |
||
| 88 | return s |
||
| 89 | elif is_url(s): |
||
| 90 | return read_content_from_url(s, **options) |
||
| 91 | elif is_s3(s): |
||
| 92 | return read_content_from_s3(s, **options) |
||
| 93 | elif is_filepath(s): |
||
| 94 | return read_content_from_file(s, **options) |
||
| 95 | # one-line data?! |
||
| 96 | return s |
||
| 97 | |||
| 98 | |||
| 99 | def read_content_from_file(filepath, format=None, **options): |
||
| 100 | binary_format = is_binary_format(format) |
||
| 101 | if binary_format: |
||
| 102 | return filepath |
||
| 103 | return fsutil.read_file(filepath) |
||
| 104 | |||
| 105 | |||
| 106 | def read_content_from_s3(url, s3_options, format=None, **options): |
||
| 107 | s3_url = parse_s3_url(url) |
||
| 108 | dirpath = tempfile.gettempdir() |
||
| 109 | filename = fsutil.get_filename(s3_url["key"]) |
||
| 110 | filepath = fsutil.join_path(dirpath, filename) |
||
| 111 | s3 = boto3.client("s3", **s3_options) |
||
| 112 | s3.download_file(s3_url["bucket"], s3_url["key"], filepath) |
||
| 113 | s3.close() |
||
| 114 | content = read_content_from_file(filepath, format, **options) |
||
| 115 | return content |
||
| 116 | |||
| 117 | |||
| 118 | def read_content_from_url(url, requests_options=None, format=None, **options): |
||
| 119 | requests_options = requests_options or {} |
||
| 120 | binary_format = is_binary_format(format) |
||
| 121 | if binary_format: |
||
| 122 | dirpath = tempfile.gettempdir() |
||
| 123 | filepath = fsutil.download_file(url, dirpath, **requests_options) |
||
| 124 | return filepath |
||
| 125 | return fsutil.read_file_from_url(url, **requests_options) |
||
| 126 | |||
| 127 | |||
| 128 | def write_content(filepath, content, **options): |
||
| 129 | if is_s3(filepath): |
||
| 130 | write_content_to_s3(filepath, content, **options) |
||
| 131 | else: |
||
| 132 | write_content_to_file(filepath, content, **options) |
||
| 133 | |||
| 134 | |||
| 135 | def write_content_to_file(filepath, content, **options): |
||
| 136 | fsutil.write_file(filepath, content) |
||
| 137 | |||
| 138 | |||
| 139 | def write_content_to_s3(url, content, s3_options, **options): |
||
| 140 | s3_url = parse_s3_url(url) |
||
| 141 | dirpath = tempfile.gettempdir() |
||
| 142 | filename = fsutil.get_filename(s3_url["key"]) |
||
| 143 | filepath = fsutil.join_path(dirpath, filename) |
||
| 144 | fsutil.write_file(filepath, content) |
||
| 145 | s3 = boto3.client("s3", **s3_options) |
||
| 146 | s3.upload_file(filepath, s3_url["bucket"], s3_url["key"]) |
||
| 147 | s3.close() |
||
| 148 | fsutil.remove_file(filepath) |
||
| 149 |