Passed
Push — master ( 7bc1c2...5b857f )
by Amin
03:31
created

ffmpeg_streaming._clouds.Clouds.download()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
ffmpeg_streaming.clouds
3
~~~~~~~~~~~~
4
5
Upload and download files -> clouds
6
7
8
:copyright: (c) 2020 by Amin Yazdanpanah.
9
:website: https://www.aminyazdanpanah.com
10
:email: [email protected]
11
:license: MIT, see LICENSE for more details.
12
"""
13
14
import abc
15
import logging
16
import tempfile
17
from os import listdir
18
from os.path import isfile, join
19
20
21
class Clouds(abc.ABC):
22
    @abc.abstractmethod
23
    def upload_directory(self, directory: str, **options) -> None:
24
        pass
25
26
    @abc.abstractmethod
27
    def download(self, filename: str = None, **options) -> str:
28
        pass
29
30
31
class S3(Clouds):
32
    def __init__(self, **options):
33
        try:
34
            import boto3
35
            from botocore.exceptions import ClientError
36
        except ImportError as e:
37
            raise ImportError("no specified import name! make sure you install the package via pip:\n\n"
38
                              "pip install boto3")
39
40
        self.s3 = boto3.client('s3', **options)
41
        self.err = ClientError
42
43
    def upload_directory(self, directory, **options):
44
        bucket_name = options.pop('bucket_name', None)
45
        if bucket_name is None:
46
            raise ValueError('You should pass a bucket name')
47
48
        files = [f for f in listdir(directory) if isfile(join(directory, f))]
49
50
        try:
51
            for file in files:
52
                self.s3.upload_file(join(directory, file), bucket_name, file)
53
        except self.err as e:
54
            logging.error(e)
55
            raise RuntimeError(e)
56
57
        logging.info("The " + directory + "directory was uploaded to Amazon S3 successfully")
58
59
    def download(self, filename=None, **options):
60
        bucket_name = options.pop('bucket_name', None)
61
        key = options.pop('key', None)
62
63
        if bucket_name is None or key is None:
64
            raise ValueError('You should pass a bucket and key name')
65
66
        if filename is None:
67
            filename = tempfile.NamedTemporaryFile(suffix='_' + key + '_py_ff_vi_st.tmp', delete=False)
68
        else:
69
            filename = open(filename, 'wb')
70
71
        try:
72
            with filename as f:
73
                self.s3.download_fileobj(bucket_name, key, f)
74
            logging.info("The " + filename.name + " file was downloaded")
75
        except self.err as e:
76
            logging.error(e)
77
            raise RuntimeError(e)
78
79
        return filename.name
80
81
82
class GCS(Clouds):
83
    def __init__(self, **options):
84
        try:
85
            from google.cloud import storage
86
        except ImportError as e:
87
            raise ImportError("no specified import name! make sure you install the package via pip:\n\n"
88
                              "pip install google-cloud-storage")
89
        self.client = storage.Client(**options)
90
91
    def upload_directory(self, directory, **options):
92
        bucket_name = options.pop('bucket_name', None)
93
        if bucket_name is None:
94
            raise ValueError('You should pass a bucket name')
95
96
        bucket = self.client.get_bucket(bucket_name)
97
98
        files = [f for f in listdir(directory) if isfile(join(directory, f))]
99
100
        for file in files:
101
            blob = bucket.blob(bucket_name + file, **options)
102
            blob.upload_from_filename(join(directory, file))
103
104
    def download(self, filename=None, **options):
105
        bucket_name = options.pop('bucket_name', None)
106
        if bucket_name is None:
107
            raise ValueError('You should pass a bucket name')
108
109
        bucket = self.client.get_bucket(bucket_name)
110
111
        if filename is None:
112
            with tempfile.NamedTemporaryFile(suffix='_py_ff_vi_st.tmp', delete=False) as tmp:
113
                filename = tmp.name
114
115
        object_name = options.pop('object_name', None)
116
        if object_name is None:
117
            raise ValueError('You should pass an object name')
118
119
        blob = bucket.get_blob(object_name, options)
120
        blob.download_to_filename(filename)
121
122
        return filename
123
124
125
class MAS(Clouds):
126
    def __init__(self, **options):
127
        try:
128
            from azure.storage.blob import BlockBlobService
129
        except ImportError as e:
130
            raise ImportError("no specified import name! make sure you installed the package via pip:\n\n"
131
                              "pip install azure-storage-blob")
132
        self.block_blob_service = BlockBlobService(**options)
133
134
    def upload_directory(self, directory, **options):
135
        container = options.pop('container', None)
136
        if container is None:
137
            raise ValueError('You should pass a container name')
138
139
        files = [f for f in listdir(directory) if isfile(join(directory, f))]
140
141
        try:
142
            for file in files:
143
                self.block_blob_service.create_blob_from_path(container, file, join(directory, file))
144
        except:
145
            error = "An error occurred while uploading the directory"
146
            logging.error(error)
147
            raise RuntimeError(error)
148
149
    def download(self, filename=None, **options):
150
        container = options.pop('container', None)
151
        blob = options.pop('blob', None)
152
153
        if container is None or blob is None:
154
            raise ValueError('You should pass a container name and a blob name')
155
156
        if filename is None:
157
            with tempfile.NamedTemporaryFile(suffix='_py_ff_vi_st.tmp', delete=False) as tmp:
158
                filename = tmp.name
159
160
        try:
161
            self.block_blob_service.get_blob_to_path(container, blob, filename)
162
            logging.info("The " + filename + " file was downloaded")
163
        except:
164
            error = "An error occurred while downloading the file"
165
            logging.error(error)
166
            raise RuntimeError(error)
167
168
        return filename
169
170
171
class CloudManager:
172
    def __init__(self):
173
        self.clouds = []
174
175
    def add(self, cloud: Clouds, **options):
176
        self.clouds.append((cloud, options))
177
        return self
178
179
    def transfer(self, method, path):
180
        for cloud in self.clouds:
181
            getattr(cloud[0], method)(path, **cloud[1])
182
183
184
__all__ = [
185
    'Clouds',
186
    'CloudManager',
187
    'S3',
188
    'GCS',
189
    'MAS'
190
]
191