Completed
Push — master ( 22cbe7...53ce72 )
by Matěj
22s queued 12s
created

timestamp_to_epoch_time()   A

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 18
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
import re
3
import os
4
import pathlib
5
6
import xml.etree.ElementTree as ET
7
8
from datetime import datetime
9
from nextcloud.base import WithRequester
10
11
12
class WebDAV(WithRequester):
13
14
    API_URL = "/remote.php/dav/files"
15
16
    def __init__(self, *args, **kwargs):
17
        super(WebDAV, self).__init__(*args)
18
        self.json_output = kwargs.get('json_output')
19
20
    def list_folders(self, uid, path=None, depth=1, all_properties=False):
21
        """
22
        Get path files list with files properties for given user, with given depth
23
24
        Args:
25
            uid (str): uid of user
26
            path (str/None): files path
27
            depth (int): depth of listing files (directories content for example)
28
            all_properties (bool): list all available file properties in Nextcloud
29
30
        Returns:
31
            list of dicts if json_output
32
            list of File objects if not json_output
33
        """
34
        if all_properties:
35
            data = """<?xml version="1.0"?>
36
                <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"
37
                            xmlns:nc="http://nextcloud.org/ns">
38
                  <d:prop>
39
                        <d:getlastmodified />
40
                        <d:getetag />
41
                        <d:getcontenttype />
42
                        <d:resourcetype />
43
                        <oc:fileid />
44
                        <oc:permissions />
45
                        <oc:size />
46
                        <d:getcontentlength />
47
                        <nc:has-preview />
48
                        <oc:favorite />
49
                        <oc:comments-unread />
50
                        <oc:owner-display-name />
51
                        <oc:share-types />
52
                  </d:prop>
53
                </d:propfind>
54
            """
55
        else:
56
            data = None
57
        additional_url = uid
58
        if path:
59
            additional_url = "{}/{}".format(additional_url, path)
60
        resp = self.requester.propfind(additional_url=additional_url,
61
                                       headers={"Depth": str(depth)},
62
                                       data=data)
63
        if not resp.is_ok:
64
            resp.data = None
65
            return resp
66
        response_data = resp.data
67
        response_xml_data = ET.fromstring(response_data)
68
        files_data = [File(single_file) for single_file in response_xml_data]
69
        resp.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
70
        return resp
71
72
    def download_file(self, uid, path):
73
        """
74
        Download file of given user by path
75
        File will be saved to working directory
76
        path argument must be valid file path
77
        Modified time of saved file will be synced with the file properties in Nextcloud
78
79
        Exception will be raised if:
80
            * path doesn't exist,
81
            * path is a directory, or if
82
            * file with same name already exists in working directory
83
84
        Args:
85
            uid (str): uid of user
86
            path (str): file path
87
88
        Returns:
89
            None
90
        """
91
        additional_url = "/".join([uid, path])
92
        filename = path.split('/')[-1] if '/' in path else path
93
        file_data = self.list_folders(uid=uid, path=path, depth=0)
94
        if not file_data:
95
            raise ValueError("Given path doesn't exist")
96
        file_resource_type = (file_data.data[0].get('resource_type')
97
                              if self.json_output
98
                              else file_data.data[0].resource_type)
99
        if file_resource_type == File.COLLECTION_RESOURCE_TYPE:
100
            raise ValueError("This is a collection, please specify file path")
101
        if filename in os.listdir('./'):
102
            raise ValueError("File with such name already exists in this directory")
103
        res = self.requester.download(additional_url)
104
        with open(filename, 'wb') as f:
105
            f.write(res.data)
106
107
        # get timestamp of downloaded file from file property on Nextcloud
108
        # If it succeeded, set the timestamp to saved local file
109
        # If the timestamp string is invalid or broken, the timestamp is downloaded time.
110
        file_timestamp_str = (file_data.data[0].get('last_modified'))
111
        file_timestamp = timestamp_to_epoch_time(file_timestamp_str)
112
        if isinstance(file_timestamp, int):
113
            os.utime(filename, (datetime.now().timestamp(), file_timestamp))
114
115
    def upload_file(self, uid, local_filepath, remote_filepath, timestamp=None):
116
        """
117
        Upload file to Nextcloud storage
118
119
        Args:
120
            uid (str): uid of user
121
            local_filepath (str): path to file on local storage
122
            remote_filepath (str): path where to upload file on Nextcloud storage
123
            timestamp (int): timestamp of upload file. If None, get time by local file.
124
        """
125
        with open(local_filepath, 'rb') as f:
126
            file_contents = f.read()
127
        if timestamp is None:
128
            timestamp = int(os.path.getmtime(local_filepath))
129
        return self.upload_file_contents(uid, file_contents, remote_filepath, timestamp)
130
131
    def upload_file_contents(self, uid, file_contents, remote_filepath, timestamp=None):
132
        """
133
        Upload file to Nextcloud storage
134
135
        Args:
136
            uid (str): uid of user
137
            file_contents (bytes): Bytes the file to be uploaded consists of
138
            remote_filepath (str): path where to upload file on Nextcloud storage
139
            timestamp (int):  mtime of upload file
140
        """
141
        additional_url = "/".join([uid, remote_filepath])
142
        return self.requester.put_with_timestamp(additional_url, data=file_contents, timestamp=timestamp)
143
144
    def create_folder(self, uid, folder_path):
145
        """
146
        Create folder on Nextcloud storage
147
148
        Args:
149
            uid (str): uid of user
150
            folder_path (str): folder path
151
        """
152
        return self.requester.make_collection(additional_url="/".join([uid, folder_path]))
153
154
    def assure_folder_exists(self, uid, folder_path):
155
        """
156
        Create folder on Nextcloud storage, don't do anything if the folder already exists.
157
        Args:
158
            uid (str): uid of user
159
            folder_path (str): folder path
160
        Returns:
161
        """
162
        self.create_folder(uid, folder_path)
163
        return True
164
165
    def assure_tree_exists(self, uid, tree_path):
166
        """
167
        Make sure that the folder structure on Nextcloud storage exists
168
        Args:
169
            uid (str): uid of user
170
            folder_path (str): The folder tree
171
        Returns:
172
        """
173
        tree = pathlib.PurePath(tree_path)
174
        parents = list(tree.parents)
175
        ret = True
176
        subfolders = parents[:-1][::-1] + [tree]
177
        for subf in subfolders:
178
            ret = self.assure_folder_exists(uid, str(subf))
179
        return ret
180
181
    def delete_path(self, uid, path):
182
        """
183
        Delete file or folder with all content of given user by path
184
185
        Args:
186
            uid (str): uid of user
187
            path (str): file or folder path to delete
188
        """
189
        url = "/".join([uid, path])
190
        return self.requester.delete(url=url)
191
192
    def move_path(self, uid, path, destination_path, overwrite=False):
193
        """
194
        Move file or folder to destination
195
196
        Args:
197
            uid (str): uid of user
198
            path (str): file or folder path to move
199
            destionation_path (str): destination where to move
200
            overwrite (bool): allow destination path overriding
201
        """
202
        path_url = "/".join([uid, path])
203
        destination_path_url = "/".join([uid, destination_path])
204
        return self.requester.move(url=path_url,
205
                                   destination=destination_path_url, overwrite=overwrite)
206
207
    def copy_path(self, uid, path, destination_path, overwrite=False):
208
        """
209
        Copy file or folder to destination
210
211
        Args:
212
            uid (str): uid of user
213
            path (str): file or folder path to copy
214
            destionation_path (str): destination where to copy
215
            overwrite (bool): allow destination path overriding
216
        """
217
        path_url = "/".join([uid, path])
218
        destination_path_url = "/".join([uid, destination_path])
219
        return self.requester.copy(url=path_url,
220
                                   destination=destination_path_url, overwrite=overwrite)
221
222
    def set_favorites(self, uid, path):
223
        """
224
        Set files of a user favorite
225
226
        Args:
227
            uid (str): uid of user
228
            path (str): file or folder path to make favorite
229
        """
230
        data = """<?xml version="1.0"?>
231
        <d:propertyupdate xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
232
          <d:set>
233
                <d:prop>
234
                  <oc:favorite>1</oc:favorite>
235
                </d:prop>
236
          </d:set>
237
        </d:propertyupdate>
238
        """
239
        url = "/".join([uid, path])
240
        return self.requester.proppatch(additional_url=url, data=data)
241
242
    def list_favorites(self, uid, path=""):
243
        """
244
        Set files of a user favorite
245
246
        Args:
247
            uid (str): uid of user
248
            path (str): file or folder path to make favorite
249
        """
250
        data = """<?xml version="1.0"?>
251
        <oc:filter-files xmlns:d="DAV:"
252
                         xmlns:oc="http://owncloud.org/ns"
253
                         xmlns:nc="http://nextcloud.org/ns">
254
                 <oc:filter-rules>
255
                         <oc:favorite>1</oc:favorite>
256
                 </oc:filter-rules>
257
         </oc:filter-files>
258
        """
259
        url = "/".join([uid, path])
260
        res = self.requester.report(additional_url=url, data=data)
261
        if not res.is_ok:
262
            res.data = None
263
            return res
264
        response_xml_data = ET.fromstring(res.data)
265
        files_data = [File(single_file) for single_file in response_xml_data]
266
        res.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
267
        return res
268
269
270
class File(object):
271
    SUCCESS_STATUS = 'HTTP/1.1 200 OK'
272
273
    # key is NextCloud property, value is python variable name
274
    FILE_PROPERTIES = {
275
        # d:
276
        "getlastmodified": "last_modified",
277
        "getetag": "etag",
278
        "getcontenttype": "content_type",
279
        "resourcetype": "resource_type",
280
        "getcontentlength": "content_length",
281
        # oc:
282
        "id": "id",
283
        "fileid": "file_id",
284
        "favorite": "favorite",
285
        "comments-href": "comments_href",
286
        "comments-count": "comments_count",
287
        "comments-unread": "comments_unread",
288
        "owner-id": "owner_id",
289
        "owner-display-name": "owner_display_name",
290
        "share-types": "share_types",
291
        "checksums": "check_sums",
292
        "size": "size",
293
        "href": "href",
294
        # nc:
295
        "has-preview": "has_preview",
296
    }
297
    xml_namespaces_map = {
298
        "d": "DAV:",
299
        "oc": "http://owncloud.org/ns",
300
        "nc": "http://nextcloud.org/ns"
301
    }
302
    COLLECTION_RESOURCE_TYPE = 'collection'
303
304
    def __init__(self, xml_data):
305
        self.href = xml_data.find('d:href', self.xml_namespaces_map).text
306
        for propstat in xml_data.iter('{DAV:}propstat'):
307
            if propstat.find('d:status', self.xml_namespaces_map).text != self.SUCCESS_STATUS:
308
                continue
309
            for file_property in propstat.find('d:prop', self.xml_namespaces_map):
310
                file_property_name = re.sub("{.*}", "", file_property.tag)
311
                if file_property_name not in self.FILE_PROPERTIES:
312
                    continue
313
                if file_property_name == 'resourcetype':
314
                    value = self._extract_resource_type(file_property)
315
                else:
316
                    value = file_property.text
317
                setattr(self, self.FILE_PROPERTIES[file_property_name], value)
318
319
    def _extract_resource_type(self, file_property):
320
        file_type = list(file_property)
321
        if file_type:
322
            return re.sub("{.*}", "", file_type[0].tag)
323
        return None
324
325
    def as_dict(self):
326
        return {key: value
327
                for key, value in self.__dict__.items()
328
                if key in self.FILE_PROPERTIES.values()}
329
330
331
class WebDAVStatusCodes(object):
332
    CREATED_CODE = 201
333
    NO_CONTENT_CODE = 204
334
    MULTISTATUS_CODE = 207
335
    ALREADY_EXISTS_CODE = 405
336
    PRECONDITION_FAILED_CODE = 412
337
338
339
def timestamp_to_epoch_time(rfc1123_date=""):
340
    """
341
    literal date time string (use in DAV:getlastmodified) to Epoch time
342
343
    No longer, Only rfc1123-date productions are legal as values for DAV:getlastmodified
344
    However, the value may be broken or invalid.
345
346
    Args:
347
        rfc1123_date (str): rfc1123-date (defined in RFC2616)
348
    Return:
349
        int or None : Epoch time, if date string value is invalid return None
350
    """
351
    try:
352
        epoch_time = datetime.strptime(rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT').timestamp()
353
    except ValueError:
354
        # validation error (DAV:getlastmodified property is broken or invalid)
355
        return None
356
    return int(epoch_time)
357