ffmpeg_streaming._clouds   A
last analyzed

Complexity

Total Complexity 40

Size/Duplication

Total Lines 209
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 134
dl 0
loc 209
rs 9.2
c 0
b 0
f 0
wmc 40

14 Methods

Rating   Name   Duplication   Size   Complexity  
A Clouds.upload_directory() 0 3 1
A Clouds.download() 0 3 1
A S3.__init__() 0 13 2
A S3.upload_directory() 0 16 4
B S3.download() 0 21 6
A GCS.__init__() 0 10 2
A CloudManager.__init__() 0 6 1
B MAS.download() 0 20 6
A CloudManager.add() 0 3 1
A CloudManager.transfer() 0 3 2
A GCS.download() 0 19 5
A MAS.upload_directory() 0 14 4
A GCS.upload_directory() 0 12 3
A MAS.__init__() 0 10 2

How to fix   Complexity   

Complexity

Complex classes like ffmpeg_streaming._clouds often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
ffmpeg_streaming.clouds
3
~~~~~~~~~~~~
4
5
Upload and download files -> clouds
6
7
8
:copyright: (c) 2020 by Amin Yazdanpanah.
9
:website: https://www.aminyazdanpanah.com
10
:email: [email protected]
11
:license: MIT, see LICENSE for more details.
12
"""
13
14
import abc
15
import logging
16
import tempfile
17
from os import listdir
18
from os.path import isfile, join, basename
19
20
21
class Clouds(abc.ABC):
22
    """
23
    @TODO: add documentation
24
    """
25
    @abc.abstractmethod
26
    def upload_directory(self, directory: str, **options) -> None:
27
        pass
28
29
    @abc.abstractmethod
30
    def download(self, filename: str = None, **options) -> str:
31
        pass
32
33
34
class S3(Clouds):
35
    def __init__(self, **options):
36
        """
37
        @TODO: add documentation
38
        """
39
        try:
40
            import boto3
41
            from botocore.exceptions import ClientError
42
        except ImportError as e:
43
            raise ImportError("No specified import name! make sure that you have installed the package via pip:\n\n"
44
                              "pip install boto3")
45
46
        self.s3 = boto3.client('s3', **options)
47
        self.err = ClientError
48
49
    def upload_directory(self, directory, **options):
50
        bucket_name = options.pop('bucket_name', None)
51
        folder = options.pop('folder', '')
52
        if bucket_name is None:
53
            raise ValueError('You should pass a bucket name')
54
55
        files = [f for f in listdir(directory) if isfile(join(directory, f))]
56
57
        try:
58
            for file in files:
59
                self.s3.upload_file(join(directory, file), bucket_name, join(folder, file).replace("\\", "/"))
60
        except self.err as e:
61
            logging.error(e)
62
            raise RuntimeError(e)
63
64
        logging.info("The {} directory was uploaded to Amazon S3 successfully".format(directory))
65
66
    def download(self, filename=None, **options):
67
        bucket_name = options.pop('bucket_name', None)
68
        key = options.pop('key', None)
69
70
        if bucket_name is None or key is None:
71
            raise ValueError('You should pass a bucket and key name')
72
73
        if filename is None:
74
            filename = tempfile.NamedTemporaryFile(prefix=basename(key), delete=False)
75
        else:
76
            filename = open(filename, 'wb')
77
78
        try:
79
            with filename as f:
80
                self.s3.download_fileobj(bucket_name, key, f)
81
            logging.info(f'The {filename.name} file was downloaded')
82
        except self.err as e:
83
            logging.error(e)
84
            raise RuntimeError(e)
85
86
        return filename.name
87
88
89
class GCS(Clouds):
90
    CLIENT = None
91
92
    def __init__(self, **options):
93
        """
94
        @TODO: add documentation
95
        """
96
        try:
97
            from google.cloud import storage
98
        except ImportError as e:
99
            raise ImportError("No specified import name! make sure that you have installed the package via pip:\n\n"
100
                              "pip install google-cloud-storage")
101
        GCS.CLIENT = storage.Client(**options)
102
103
    def upload_directory(self, directory, **options):
104
        bucket_name = options.pop('bucket_name', None)
105
        if bucket_name is None:
106
            raise ValueError('You should pass a bucket name')
107
108
        bucket = GCS.CLIENT.get_bucket(bucket_name)
109
        folder = options.pop('folder', '')
110
        files = [f for f in listdir(directory) if isfile(join(directory, f))]
111
112
        for file in files:
113
            blob = bucket.blob(join(folder, file).replace("\\", "/"), **options)
114
            blob.upload_from_filename(join(directory, file))
115
116
    def download(self, filename=None, **options):
117
        bucket_name = options.pop('bucket_name', None)
118
        if bucket_name is None:
119
            raise ValueError('You should pass a bucket name')
120
121
        bucket = GCS.CLIENT.get_bucket(bucket_name)
122
        object_name = options.pop('object_name', None)
123
124
        if object_name is None:
125
            raise ValueError('You should pass an object name')
126
127
        if filename is None:
128
            with tempfile.NamedTemporaryFile(prefix=basename(object_name), delete=False) as tmp:
129
                filename = tmp.name
130
131
        blob = bucket.get_blob(object_name, **options)
132
        blob.download_to_filename(filename)
133
134
        return filename
135
136
137
class MAS(Clouds):
138
    def __init__(self, **options):
139
        """
140
        @TODO: add documentation
141
        """
142
        try:
143
            from azure.storage.blob import BlockBlobService
144
        except ImportError as e:
145
            raise ImportError("No specified import name! make sure that you have installed the package via pip:\n\n"
146
                              "pip install azure-storage-blob")
147
        self.block_blob_service = BlockBlobService(**options)
148
149
    def upload_directory(self, directory, **options):
150
        container = options.pop('container', None)
151
        if container is None:
152
            raise ValueError('You should pass a container name')
153
154
        files = [f for f in listdir(directory) if isfile(join(directory, f))]
155
156
        try:
157
            for file in files:
158
                self.block_blob_service.create_blob_from_path(container, file, join(directory, file))
159
        except:
160
            error = "An error occurred while uploading the directory"
161
            logging.error(error)
162
            raise RuntimeError(error)
163
164
    def download(self, filename=None, **options):
165
        container = options.pop('container', None)
166
        blob = options.pop('blob', None)
167
168
        if container is None or blob is None:
169
            raise ValueError('You should pass a container name and a blob name')
170
171
        if filename is None:
172
            with tempfile.NamedTemporaryFile(prefix=basename(blob), delete=False) as tmp:
173
                filename = tmp.name
174
175
        try:
176
            self.block_blob_service.get_blob_to_path(container, blob, filename)
177
            logging.info(f'The {filename} file was downloaded')
178
        except:
179
            error = "An error occurred while downloading the file"
180
            logging.error(error)
181
            raise RuntimeError(error)
182
183
        return filename
184
185
186
class CloudManager:
187
    def __init__(self, filename: str = None):
188
        """
189
        @TODO: add documentation
190
        """
191
        self.filename = filename
192
        self.clouds = []
193
194
    def add(self, cloud: Clouds, **options):
195
        self.clouds.append((cloud, options))
196
        return self
197
198
    def transfer(self, method, path):
199
        for cloud in self.clouds:
200
            getattr(cloud[0], method)(path, **cloud[1])
201
202
203
__all__ = [
204
    'Clouds',
205
    'CloudManager',
206
    'S3',
207
    'GCS',
208
    'MAS'
209
]
210