GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

b2blaze.models.file_list.B2FileList.get_versions()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 21
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 4.679

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 4
dl 0
loc 21
ccs 3
cts 7
cp 0.4286
crap 4.679
rs 10
c 0
b 0
f 0
1
"""
2
Copyright George Sibble 2018
3
"""
4
5 1
from .b2_file import B2File
6 1
from ..utilities import b2_url_encode, get_content_length, get_part_ranges, decode_error, RangeStream, StreamWithHashProgress
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (125/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
Unused Code introduced by
Unused decode_error imported from utilities
Loading history...
Unused Code introduced by
Unused StreamWithHashProgress imported from utilities
Loading history...
7 1
from ..b2_exceptions import B2Exception, B2FileNotFoundError
8 1
from multiprocessing.dummy import Pool as ThreadPool
0 ignored issues
show
introduced by
standard import "from multiprocessing.dummy import Pool as ThreadPool" should be placed before "from .b2_file import B2File"
Loading history...
9 1
from ..api import API
10
11 1
class B2FileList(object):
0 ignored issues
show
Documentation introduced by
Empty class docstring
Loading history...
12
    """
13
14
    """
15 1
    def __init__(self, connector, bucket):
16
        """
17
18
        :param connector:
19
        :param bucket:
20
        """
21 1
        self.connector = connector
22 1
        self.bucket = bucket
23 1
        self._files_by_name = {}
24 1
        self._files_by_id = {}
25
26 1
    def all(self, include_hidden=False, limit=None):
27
        """ Return an updated list of all files.
28
            (This does not include hidden files unless include_hidden flag set to True)
29
30
            Parameters:
31
                include_hidden:         (bool) Include hidden files
32
                limit:                  (int)  Limit number of file results
33
34
        """
35 1
        if not include_hidden:
36 1
            return self._update_files_list(retrieve=True, limit=limit)
37
        else:
38 1
            results = self.all_file_versions(limit=limit)
39 1
            versions = results['file_versions']
40 1
            file_ids = results['file_ids']
41 1
            if versions:
42
                # Return only the first file from a given file with multiple versions
43 1
                files = [versions[f][0] for f in file_ids]
44 1
                return files
45 1
        return []   # Return empty set on no results
46
47 1
    def delete_all(self, confirm=False):
48
        """ Delete all files in the bucket. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
49
            Parameters:
50
                confirm:    (bool)  Safety check. Confirm deletion
51
        """ 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
52 1
        if not confirm:
53
            raise Exception('This will delete all files! Pass confirm=True')
54
        
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
55 1
        all_files = self.all(include_hidden=True)
56 1
        try:
57 1
            for f in all_files:
0 ignored issues
show
Coding Style Naming introduced by
The name f does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
58 1
                f.delete_all_versions(confirm=True)
59
        except Exception as E:
0 ignored issues
show
Coding Style Naming introduced by
The name E does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
60
            raise B2Exception.parse(E)
61 1
        return []
62
63
        
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
64 1
    def _update_files_list(self, retrieve=False, limit=None):
65
        """ Retrieve list of all files in bucket 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
66
            Parameters:
67
                limit:      (int)  Max number of file results, default 10000
68
                retrieve:   (bool) Refresh local store. (default: false)
69
        """
70 1
        path = API.list_all_files
71 1
        files = []
72 1
        new_files_to_retrieve = True
73 1
        params = {
74
            'bucketId': self.bucket.bucket_id,
75
            'maxFileCount': limit or 10000
76
        }
77 1
        while new_files_to_retrieve:
78 1
            response = self.connector.make_request(path=path, method='post', params=params)
79 1
            if response.status_code == 200:
80 1
                files_json = response.json()
81 1
                self._files_by_name = {}
82 1
                self._files_by_id = {}
83 1
                for file_json in files_json['files']:
84 1
                    new_file = B2File(connector=self.connector, parent_list=self, **file_json)
85 1
                    files.append(new_file)
86 1
                    self._files_by_name[file_json['fileName']] = new_file
87 1
                    self._files_by_id[file_json['fileId']] = new_file
88 1
                if files_json['nextFileName'] is None:
89 1
                    new_files_to_retrieve = False
90
                else:
91
                    params['startFileName'] = files_json['nextFileName']
92
            else:
93
                raise B2Exception.parse(response)
94 1
        if retrieve:
95 1
            return files
96
97
98 1
    def get(self, file_name=None, file_id=None):
99
        """ Get a file by file name or id.
100
            Required:
101
                file_name or file_id
102
103
            Parameters:
104
                file_name:          (str) File name 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
105
                file_id:            (str) File ID 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
106
        """
107 1
        if file_name:
108 1
            file = self._get_by_name(file_name)
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in file.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
109
110 1
        elif file_id:
111 1
            file = self._get_by_id(file_id)
112
        else:
113
            raise ValueError('file_name or file_id must be passed')
114
        
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
115 1
        return file
116
117
118 1
    def get_versions(self, file_name=None, file_id=None, limit=None):
0 ignored issues
show
Unused Code introduced by
The argument limit seems to be unused.
Loading history...
119
        """ Return list of all the versions of one file in current bucket. 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
120
            Required:
121
                file_id or file_name   (either)
122
123
            Params:
124
                file_id:            (str) File id
125
                file_name:          (str) File id
126
                limit:              (int) Limit number of results returned (optional)
127
128
            Returns:
129
                file_versions       (list) B2FileObject of all file versions
130
        """ 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
131 1
        if file_name:
132 1
            file = self.get(file_name)
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in file.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
133
134
        elif file_id:
135
            file = self.get(file_id=file_id)
136
        else:
137
            raise ValueError('Either file_id or file_name required for get_versions')
138
        return file.get_versions()
139
        
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
140
141 1
    def all_file_versions(self, limit=None):
142
        """ Return all the versions of all files in a given bucket.
143
144
            Params:
145
                limit:              (int) Limit number of results returned (optional). Defaults to 10000
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (104/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
146
147
            Returns dict: 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
148
                'file_names':       (list) String filenames
149
                'file_ids':         (list) File IDs
150
                'file_versions':    (dict) b2blaze File objects, keyed by file name
151
        """ 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
152
153 1
        path = API.list_file_versions
154 1
        file_versions = dict()
155 1
        file_names = []
156 1
        file_ids = []
157 1
        new_files_to_retrieve = True
158 1
        params = {
159
            'bucketId': self.bucket.bucket_id,
160
            'maxFileCount': 10000
161
        }
162
163
        # Limit files
164 1
        if limit:
165
            params['maxFileCount'] = limit
166
167 1
        while new_files_to_retrieve:
168
169 1
            response = self.connector.make_request(path=path, method='post', params=params)
170 1
            if response.status_code == 200:
171 1
                files_json = response.json()
172 1
                for file_json in files_json['files']:
173 1
                    new_file = B2File(connector=self.connector, parent_list=self, **file_json)
174
175
                    # Append file_id, file_name to lists
176 1
                    file_name, file_id = file_json['fileName'], file_json['fileId']
177 1
                    file_names.append(file_name)
178 1
                    file_ids.append(file_id)
179
                    
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
180
                    # Add file to list keyed by file_id
181 1
                    if file_id in file_versions:
182
                        file_versions[file_id].append(new_file)
183
                    else:
184 1
                        file_versions[file_id] = [new_file]
185
186 1
                if files_json['nextFileName'] is None:
187 1
                    new_files_to_retrieve = False
188
                else:
189
                    params['startFileName'] = files_json['nextFileName']
190
            else:
191
                raise B2Exception.parse(response)
192 1
        return {'file_names': file_names, 'file_versions': file_versions, 'file_ids': file_ids}
193
194
195 1
    def _get_by_name(self, file_name):
196
        """ Internal method, return single file by file name """ 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
197 1
        path = API.list_all_files
198 1
        params = {
199
            'prefix': b2_url_encode(file_name),
200
            'bucketId': self.bucket.bucket_id
201
        }
202
203 1
        response = self.connector.make_request(path, method='post', params=params)
204 1
        file_json = response.json()
205
206
        # Handle errors and empty files
207 1
        if not response.status_code == 200:
208
            raise B2Exception.parse(response)
209 1
        if not len(file_json['files']) > 0:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
210 1
            raise B2FileNotFoundError('Filename {} not found'.format(file_name))
211
        else:
212 1
            return B2File(connector=self.connector, parent_list=self, **file_json['files'][0])
213
214 1
    def _get_by_id(self, file_id):
215
        """ Internal method, return single file by file id """ 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
216 1
        path = API.file_info
217 1
        params = {
218
            'fileId': file_id
219
        }
220 1
        response = self.connector.make_request(path, method='post', params=params)
221 1
        if response.status_code == 200:
222 1
            file_json = response.json()
223 1
            return B2File(connector=self.connector, parent_list=self, **file_json)
224
        else:
225 1
            raise B2Exception.parse(response)
226
            
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
227
228 1
    def upload(self, contents, file_name, mime_content_type=None, content_length=None, progress_listener=None):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (111/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
best-practice introduced by
Too many arguments (6/5)
Loading history...
Unused Code introduced by
The argument mime_content_type seems to be unused.
Loading history...
229
        """
230
231
        :param contents:
232
        :param file_name:
233
        :param mime_content_type:
234
        :param content_length:
235
        :param progress_listener:
236
        :return:
237
        """
238 1
        if file_name[0] == '/':
239
            file_name = file_name[1:]
240 1
        get_upload_url_path = API.upload_url
241 1
        params = {
242
            'bucketId': self.bucket.bucket_id
243
        }
244 1
        upload_url_response = self.connector.make_request(path=get_upload_url_path, method='post', params=params)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (113/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
245 1
        if upload_url_response.status_code == 200:
246 1
            upload_url = upload_url_response.json().get('uploadUrl', None)
247 1
            auth_token = upload_url_response.json().get('authorizationToken', None)
248 1
            upload_response = self.connector.upload_file(file_contents=contents, file_name=file_name,
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
249
                                                         upload_url=upload_url, auth_token=auth_token,
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (102/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
250
                                                         content_length=content_length, progress_listener=progress_listener)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (124/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
251 1
            if upload_response.status_code == 200:
252 1
                new_file = B2File(connector=self.connector, parent_list=self, **upload_response.json())
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (103/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
253
                # Update file list after upload
254 1
                self._update_files_list()
255 1
                return new_file
256
            else:
257
                raise B2Exception.parse(upload_response)
258
        else:
259
            raise B2Exception.parse(upload_url_response)
260
261 1
    def upload_large_file(self, contents, file_name, part_size=None, num_threads=4,
0 ignored issues
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (19/15).
Loading history...
262
                          mime_content_type=None, content_length=None, progress_listener=None):
263
        """
264
265
        :param contents:
266
        :param file_name:
267
        :param part_size:
268
        :param num_threads:
269
        :param mime_content_type:
270
        :param content_length:
271
        :param progress_listener:
272
        :return:
273
        """
274
        if file_name[0] == '/':
275
            file_name = file_name[1:]
276
        if part_size == None:
0 ignored issues
show
introduced by
Comparison to None should be 'expr is None'
Loading history...
277
            part_size = self.connector.recommended_part_size
278
        if content_length == None:
0 ignored issues
show
introduced by
Comparison to None should be 'expr is None'
Loading history...
279
            content_length = get_content_length(contents)
280
        start_large_file_path = API.upload_large
281
        params = {
282
            'bucketId': self.bucket.bucket_id,
283
            'fileName': b2_url_encode(file_name),
284
            'contentType': mime_content_type or 'b2/x-auto'
285
        }
286
        large_file_response = self.connector.make_request(path=start_large_file_path, method='post', params=params)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (115/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
287
        if large_file_response.status_code == 200:
288
            file_id = large_file_response.json().get('fileId', None)
289
            get_upload_part_url_path = API.upload_large_part
290
            params = {
291
                'fileId': file_id
292
            }
293
            pool = ThreadPool(num_threads)
294
            def upload_part_worker(args):
0 ignored issues
show
Coding Style introduced by
This function should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
295
                part_number, part_range = args
296
                offset, content_length = part_range
297
                with open(contents.name, 'rb') as file:
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in file.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
298
                    file.seek(offset)
299
                    stream = RangeStream(file, offset, content_length)
300
                    upload_part_url_response = self.connector.make_request(path=get_upload_part_url_path, method='post', params=params)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (135/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
301
                    if upload_part_url_response.status_code == 200:
302
                        upload_url = upload_part_url_response.json().get('uploadUrl')
303
                        auth_token = upload_part_url_response.json().get('authorizationToken')
304
                        upload_part_response = self.connector.upload_part(file_contents=stream, content_length=content_length,
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (126/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
305
                                                                          part_number=part_number, upload_url=upload_url,
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (121/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
306
                                                                          auth_token=auth_token, progress_listener=progress_listener)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (133/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
307
                        if upload_part_response.status_code == 200:
308
                            return upload_part_response.json().get('contentSha1', None)
309
                        else:
310
                            raise B2Exception.parse(upload_part_response)
311
                    else:
312
                        raise B2Exception.parse(upload_part_url_response)
313
            sha_list = pool.map(upload_part_worker, enumerate(get_part_ranges(content_length, part_size), 1))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (109/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
314
            pool.close()
315
            pool.join()
316
            finish_large_file_path = API.upload_large_finish
317
            params = {
318
                'fileId': file_id,
319
                'partSha1Array': sha_list
320
            }
321
            finish_large_file_response = self.connector.make_request(path=finish_large_file_path, method='post', params=params)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (127/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
322
            if finish_large_file_response.status_code == 200:
323
                new_file = B2File(connector=self.connector, parent_list=self, **finish_large_file_response.json())
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (114/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
324
                return new_file
325
            else:
326
                raise B2Exception.parse(finish_large_file_response)
327
        else:
328
            raise B2Exception.parse(large_file_response)
329