Passed
Pull Request — master (#69)
by
unknown
04:24
created

nextcloud.api_wrappers.webdav.File.dirname()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
"""
3
WebDav API wrapper
4
See https://docs.nextcloud.com/server/14/developer_manual/client_apis/WebDAV/index.html
5
    https://doc.owncloud.com/server/developer_manual/webdav_api/search.html
6
    https://docs.nextcloud.com/server/14/developer_manual/client_apis/WebDAV/search.html
7
8
Not implemented yet:
9
   - search feature
10
   - trash
11
   - versions
12
   - chunked file upload
13
"""
14
# implementing dav search
15
#-> add a function to build xml search
16
#   see ../common/simplexml.py and ../common/collections.py
17
import re
18
import os
19
try:
20
    import pathlib
21
except ImportError:
22
    import pathlib2 as pathlib
23
24
import xml.etree.ElementTree as ET
25
from datetime import datetime
26
from nextcloud.base import WebDAVApiWrapper
27
from nextcloud.common.collections import PropertySet
28
from nextcloud.common.properties import Property as Prop, NAMESPACES_MAP
29
from nextcloud.common.value_parsing import (
30
    timestamp_to_epoch_time,
31
    datetime_to_timestamp
32
)
33
34
35
class NextCloudDirectoryNotEmpty(Exception):
36
    """ Exception to raise when you try to remove a folder that is not empty"""
37
38
class NextCloudFileConflict(Exception):
39
    """ Exception to raise when you try to create a File that alreay exists """
40
41
42
class File(PropertySet):
43
    """
44
    Define properties on a WebDav file/folder
45
46
    Additionnally, provide an objective CRUD API
47
    (that probably consume more energy than fetching specific attributes)
48
49
    Example :
50
    >>> root = nxc.get_folder()  # get root
51
    >>> def _list_rec(d, indent=""):
52
    >>>     # list files recursively
53
    >>>     print("%s%s%s" % (indent, d.basename(), '/' if d.isdir() else ''))
54
    >>>     if d.isdir():
55
    >>>         for i in d.list():
56
    >>>             _list_rec(i, indent=indent+"  ")
57
    >>>
58
    >>> _list_rec(root)
59
    """
60
61
    @staticmethod
62
    def _extract_resource_type(file_property):
63
        file_type = list(file_property)
64
        if file_type:
65
            return re.sub('{.*}', '', file_type[0].tag)
66
        return None
67
68
    _attrs = [
69
        Prop('d:getlastmodified'),
70
        Prop('d:getetag'),
71
        Prop('d:getcontenttype'),
72
        Prop('d:resourcetype', parse_xml_value=(lambda p: File._extract_resource_type(p))),
73
        Prop('d:getcontentlength'),
74
        Prop('oc:id'),
75
        Prop('oc:fileid'),
76
        Prop('oc:favorite'),
77
        Prop('oc:comments-href'),
78
        Prop('oc:comments-count'),
79
        Prop('oc:comments-unread'),
80
        Prop('oc:owner-id'),
81
        Prop('oc:owner-display-name'),
82
        Prop('oc:share-types'),
83
        Prop('oc:checksums'),
84
        Prop('oc:size'),
85
        Prop('oc:href'),
86
        Prop('nc:has-preview')
87
    ]
88
89
    def isfile(self):
90
        """ say if the file is a file /!\\ ressourcetype property shall be loaded """
91
        return not self.resource_type
92
93
    def isroot(self):
94
        """ say if the file is a directory /!\\ ressourcetype property shall be loaded """
95
        return not self.get_relative_path().replace('/','')
96
97
    def isdir(self):
98
        """ say if the file is a directory /!\\ ressourcetype property shall be loaded """
99
        return self.resource_type == self.COLLECTION_RESOURCE_TYPE
100
101
    def get_relative_path(self):
102
        """ get path relative to user root """
103
        return self._wrapper.get_relative_path(self.href)
104
105
    def _get_remote_path(self, path=None):
106
        _url = self.get_relative_path()
107
        return '/'.join([_url, path]) if path else _url
108
109
    def basename(self):
110
        """ basename """
111
        _path = self._get_remote_path()
112
        return _path.split('/')[-2] if _path.endswith('/') else _path.split('/')[-1]
113
114
    def dirname(self):
115
        """ dirname """
116
        _path = self._get_remote_path()
117
        return '/'.join(_path.split('/')[:-2]) if _path.endswith('/') else '/'.join(_path.split('/')[:-1])
118
119
    def __eq__(self, b):
120
        return self.href == b.href
121
122
    # MINIMAL SET OF CRUD OPERATIONS
123
    def get_folder(self, path=None, all_properties=False):
124
        """
125
        Get folder (see WebDav wrapper)
126
        :param path: if empty list current dir
127
        :param all_properties: fetch all properties (default False)
128
        :returns: a folder (File object)
129
130
        Note : To check if sub folder exists, use get_file method
131
        """
132
        return self._wrapper.get_folder(self._get_remote_path(path),
133
                                        all_properties=all_properties)
134
135
    def get_file(self, path=None, all_properties=False):
136
        """
137
        Get file (see WebDav wrapper)
138
        :param path: if empty, get current file
139
        :param all_properties: fetch all properties (default False)
140
        :returns: a file or folder (File object)
141
        """
142
        return self._wrapper.get_file(self._get_remote_path(path),
143
                                      all_properties=all_properties)
144
145
    def list(self, subpath='', filter_rules=None, all_properties=False):
146
        """
147
        List folder (see WebDav wrapper)
148
        :param subpath: if empty list current dir
149
        :param all_properties: fetch all properties (default False)
150
        :returns: list of Files
151
        """
152
        if filter_rules:
153
            resp = self._wrapper.fetch_files_with_filter(
154
                path=self._get_remote_path(subpath),
155
                filter_rules=filter_rules
156
            )
157
        else:
158
            resp = self._wrapper.list_folders(
159
                self._get_remote_path(subpath),
160
                depth=1,
161
                all_properties=all_properties
162
            )
163
        if resp.is_ok and resp.data:
164
            _dirs = resp.data
165
            # remove current dir
166
            if _dirs[0] == self:
167
                _dirs = _dirs[1:]
168
            return _dirs
169
        return []
170
171
    def upload_file(self, local_filepath, name, timestamp=None):
172
        """
173
        Upload file (see WebDav wrapper)
174
        :param local_filepath: path of the local file
175
        :param name: name of the new file
176
        :param timestamp (int): timestamp of upload file. If None, get time by local file.
177
        :returns: True if success
178
        """
179
        resp = self._wrapper.upload_file(local_filepath,
180
                                         self._get_remote_path(name),
181
                                         timestamp=timestamp)
182
        return resp.is_ok
183
184
    def upload_file_contents(self, file_contents, name=None, timestamp=None):
185
        """
186
        Upload file content (see WebDav wrapper)
187
        :param file_contents: binary content of the file
188
        :param name: name of the new file (current file if empty)
189
        :param timestamp (int):  mtime of upload file
190
        :returns: True if success
191
        """
192
        resp = self._wrapper.upload_file_contents(file_contents,
193
                                         self._get_remote_path(name),
194
                                         timestamp=timestamp)
195
        return resp.is_ok
196
197
198
    def download(self, name=None, target_dir=None):
199
        """
200
        Download file (see WebDav wrapper)
201
        :param name: name of the new file
202
        :returns: True if success
203
        """
204
        path = self._get_remote_path(name)
205
        target_path, _file_info = self._wrapper.download_file(path,
206
                                                              target_dir=target_dir)
207
        assert os.path.isfile(target_path), "Download failed"
208
        return target_path
209
210
    def isempty(self):
211
        """
212
        Say if a folder is emty (always False if not a directory)
213
        :returns: True if current dir is empty
214
        """
215
        if not self.isdir():
216
            return False
217
        return not self.list()
218
219
220
    def delete(self, subpath='', recursive=False):
221
        """
222
        Delete file or folder (see WebDav wrapper)
223
        :param subpath: if empty, delete current file
224
        :param recursive: delete recursively
225
        :returns: True if success
226
        """
227
        if recursive:
228
            resp = self._wrapper.delete_path(self._get_remote_path(subpath))
229
        else:
230
            if subpath:
231
                _file = self.get_file(subpath, all_properties=False)
232
                return _file.delete(recursive=recursive)
233
            if not self.isempty():
234
                raise NextCloudDirectoryNotEmpty(self.get_relative_path())
235
            return self.delete(recursive=True)
236
237
        return resp.is_ok
238
239
240
class WebDAV(WebDAVApiWrapper):
241
    """ WebDav API wrapper """
242
    API_URL = "/remote.php/dav/files"
243
244
    def _get_path(self, path):
245
        if path:
246
            return '/'.join([self.client.user, path]).replace('//', '/')
247
        return self.client.user
248
249
    def list_folders(self, path=None, depth=1, all_properties=False,
250
                     fields=None):
251
        """
252
        Get path files list with files properties with given depth
253
        (for current user)
254
255
        Args:
256
            path (str/None): files path
257
            depth (int): depth of listing files (directories content for example)
258
            all_properties (bool): list all available file properties in Nextcloud
259
            fields (str list): file properties to fetch
260
261
        Returns:
262
            list of dicts if json_output
263
            list of File objects if not json_output
264
        """
265
        data = File.build_xml_propfind(
266
            use_default=all_properties,
267
            fields=fields
268
        ) if (fields or all_properties) else None
269
        resp = self.requester.propfind(additional_url=self._get_path(path),
270
                                       headers={'Depth': str(depth)},
271
                                       data=data)
272
        return File.from_response(resp, json_output=self.json_output,
273
                                  wrapper=self)
274
275
    def download_file(self, path, target_dir=None):
276
        """
277
        Download file by path (for current user)
278
        File will be saved to working directory
279
        path argument must be valid file path
280
        Modified time of saved file will be synced with the file properties in Nextcloud
281
282
        Exception will be raised if:
283
            * path doesn't exist,
284
            * path is a directory, or if
285
            * file with same name already exists in working directory
286
287
        Args:
288
            path (str): file path
289
290
        Returns:
291
            a tuple (target_path, File object)
292
        """
293
        if not target_dir:
294
            target_dir = './'
295
        filename = path.split('/')[(-1)] if '/' in path else path
296
        file_data = self.get_file(path)
297
        if not file_data:
298
            raise ValueError("Given path doesn't exist")
299
        file_resource_type = file_data.resource_type
300
        if file_resource_type == File.COLLECTION_RESOURCE_TYPE:
301
            raise ValueError("This is a collection, please specify file path")
302
        if filename in os.listdir(target_dir):
303
            raise ValueError(
304
                "File with such name already exists in this directory")
305
        filename = os.path.join(target_dir, filename)
306
        res = self.requester.download(self._get_path(path))
307
        with open(filename, 'wb') as f:
308
            f.write(res.data)
309
310
        # get timestamp of downloaded file from file property on Nextcloud
311
        # If it succeeded, set the timestamp to saved local file
312
        # If the timestamp string is invalid or broken, the timestamp is downloaded time.
313
        file_timestamp_str = file_data.last_modified
314
        file_timestamp = timestamp_to_epoch_time(file_timestamp_str)
315
        if isinstance(file_timestamp, int):
316
            os.utime(filename, (
317
                datetime_to_timestamp(datetime.now()),
318
                file_timestamp))
319
        return (filename, file_data)
320
321
    def upload_file(self, local_filepath, remote_filepath, timestamp=None):
322
        """
323
        Upload file to Nextcloud storage
324
325
        Args:
326
            local_filepath (str): path to file on local storage
327
            remote_filepath (str): path where to upload file on Nextcloud storage
328
            timestamp (int): timestamp of upload file. If None, get time by local file.
329
330
        Returns:
331
            requester response
332
        """
333
        with open(local_filepath, 'rb') as f:
334
            file_contents = f.read()
335
        if timestamp is None:
336
            timestamp = int(os.path.getmtime(local_filepath))
337
        return self.upload_file_contents(file_contents, remote_filepath, timestamp)
338
339
    def upload_file_contents(self, file_contents, remote_filepath, timestamp=None):
340
        """
341
        Upload file to Nextcloud storage
342
343
        Args:
344
            file_contents (bytes): Bytes the file to be uploaded consists of
345
            remote_filepath (str): path where to upload file on Nextcloud storage
346
            timestamp (int):  mtime of upload file
347
348
        Returns:
349
            requester response
350
        """
351
        return self.requester.put_with_timestamp(
352
            self._get_path(remote_filepath), data=file_contents, timestamp=timestamp)
353
354
    def create_folder(self, folder_path):
355
        """
356
        Create folder on Nextcloud storage
357
358
        Args:
359
            folder_path (str): folder path
360
361
        Returns:
362
            requester response
363
        """
364
        return self.requester.make_collection(additional_url=(self._get_path(folder_path)))
365
366
    def assure_folder_exists(self, folder_path):
367
        """
368
        Create folder on Nextcloud storage, don't do anything if the folder already exists.
369
        Args:
370
            folder_path (str): folder path
371
        Returns:
372
            requester response
373
        """
374
        self.create_folder(folder_path)
375
        return True
376
377
    def assure_tree_exists(self, tree_path):
378
        """
379
        Make sure that the folder structure on Nextcloud storage exists
380
        Args:
381
            folder_path (str): The folder tree
382
        Returns:
383
            requester response
384
        """
385
        tree = pathlib.PurePath(tree_path)
386
        parents = list(tree.parents)
387
        ret = True
388
        subfolders = parents[:-1][::-1] + [tree]
389
        for subf in subfolders:
390
            ret = self.assure_folder_exists(str(subf))
391
392
        return ret
393
394
    def delete_path(self, path):
395
        """
396
        Delete file or folder with all content of given user by path
397
398
        Args:
399
            path (str): file or folder path to delete
400
401
        Returns:
402
            requester response
403
        """
404
        return self.requester.delete(url=self._get_path(path))
405
406
    def move_path(self, path, destination_path, overwrite=False):
407
        """
408
        Move file or folder to destination
409
410
        Args:
411
            path (str): file or folder path to move
412
            destionation_path (str): destination where to move
413
            overwrite (bool): allow destination path overriding
414
415
        Returns:
416
            requester response
417
        """
418
        return self.requester.move(url=self._get_path(path),
419
                                   destination=self._get_path(
420
                                       destination_path),
421
                                   overwrite=overwrite)
422
423
    def copy_path(self, path, destination_path, overwrite=False):
424
        """
425
        Copy file or folder to destination
426
427
        Args:
428
            path (str): file or folder path to copy
429
            destionation_path (str): destination where to copy
430
            overwrite (bool): allow destination path overriding
431
432
        Returns:
433
            requester response
434
        """
435
        return self.requester.copy(url=self._get_path(path),
436
                                   destination=self._get_path(
437
                                       destination_path),
438
                                   overwrite=overwrite)
439
440
    def set_file_property(self, path, update_rules):
441
        """
442
        Set file property
443
444
        Args:
445
            path (str): file or folder path to make favorite
446
            update_rules : a dict { namespace: {key : value } }
447
448
        Returns:
449
            requester response with list<File> in data
450
451
        Note :
452
            check keys in nextcloud.common.properties.NAMESPACES_MAP for namespace codes
453
            check object property xml_name for property name
454
        """
455
        data = File.build_xml_propupdate(update_rules)
456
        return self.requester.proppatch(additional_url=self._get_path(path), data=data)
457
458
    def fetch_files_with_filter(self, path='', filter_rules=''):
459
        """
460
        List files according to a filter
461
462
        Args:
463
            path (str): file or folder path to search
464
            filter_rules : a dict { namespace: {key : value } }
465
466
        Returns:
467
            requester response with list<File> in data
468
469
        Note :
470
            check keys in nextcloud.common.properties.NAMESPACES_MAP for namespace codes
471
            check object property xml_name for property name
472
        """
473
        data = File.build_xml_propfind(
474
            instr='oc:filter-files', filter_rules=filter_rules)
475
        resp = self.requester.report(
476
            additional_url=self._get_path(path), data=data)
477
        return File.from_response(resp, json_output=self.json_output,
478
                                  wrapper=self)
479
480
    def set_favorites(self, path):
481
        """
482
        Set files of a user favorite
483
484
        Args:
485
            path (str): file or folder path to make favorite
486
487
        Returns:
488
            requester response
489
        """
490
        return self.set_file_property(path, {'oc': {'favorite': 1}})
491
492
    def list_favorites(self, path=''):
493
        """
494
        List favorites (files) of the user
495
496
        Args:
497
            path (str): file or folder path to search favorite
498
499
        Returns:
500
            requester response with list<File> in data
501
        """
502
        return self.fetch_files_with_filter(path, {'oc': {'favorite': 1}})
503
504
    def get_file_property(self, path, field, ns='oc'):
505
        """
506
        Fetch asked properties from a file path.
507
508
        Args:
509
            path (str): file or folder path to make favorite
510
            field (str): field name
511
512
        Returns:
513
            requester response with asked value in data
514
        """
515
        if ':' in field:
516
            ns, field = field.split(':')
517
        get_file_prop_xpath = '{DAV:}propstat/d:prop/%s:%s' % (ns, field)
518
        data = File.build_xml_propfind(fields={ns: [field]})
519
        resp = self.requester.propfind(additional_url=(self._get_path(path)), headers={'Depth': str(0)},
520
                                       data=data)
521
        response_data = resp.data
522
        resp.data = None
523
        if not resp.is_ok:
524
            return resp
525
526
        response_xml_data = ET.fromstring(response_data)
527
        for xml_data in response_xml_data:
528
            for prop in xml_data.findall(get_file_prop_xpath,
529
                                         NAMESPACES_MAP):
530
                resp.data = prop.text
531
            break
532
533
        return resp
534
535
    def get_file(self, path, all_properties=False):
536
        """
537
        Return the File object associated to the path
538
539
        :param path: path to the file
540
        :returns: File object or None
541
        """
542
        resp = self.client.with_attr(json_output=False).list_folders(
543
            path, all_properties=all_properties, depth=0)
544
        if resp.is_ok:
545
            if resp.data:
546
                return resp.data[0]
547
        return None
548
549
    def get_folder(self, path=None, all_properties=False):
550
        """
551
        Return the File object associated to the path
552
        If the file (folder or 'collection') doesn't exists, create it.
553
554
        :param path: path to the file/folder, if empty use root
555
        :returns: File object
556
        """
557
        fileobj = self.get_file(path, all_properties=all_properties)
558
        if fileobj:
559
            if not fileobj.isdir():
560
                raise NextCloudFileConflict(fileobj.href)
561
        else:
562
            self.client.create_folder(path)
563
            fileobj = self.get_file(path, all_properties=all_properties)
564
565
        return fileobj
566
567
    def get_relative_path(self, href):
568
        """
569
        Returns relative (to application / user) path
570
571
        :param href(str):  file href
572
        :returns   (str):  relative path
573
        """
574
        _app_root = '/'.join([self.API_URL, self.client.user])
575
        return href[len(_app_root):]
576