EnterpriseyIntranet /
nextcloud-API
| 1 | import os |
||
| 2 | from requests.utils import quote |
||
| 3 | from datetime import datetime |
||
| 4 | |||
| 5 | from .base import BaseTestCase, LocalNxcUserMixin |
||
| 6 | from nextcloud.api_wrappers import WebDAV |
||
| 7 | from nextcloud.api_wrappers.webdav import timestamp_to_epoch_time |
||
| 8 | |||
| 9 | |||
| 10 | class TestWebDAV(LocalNxcUserMixin, BaseTestCase): |
||
| 11 | |||
| 12 | CREATED_CODE = 201 |
||
| 13 | NO_CONTENT_CODE = 204 |
||
| 14 | MULTISTATUS_CODE = 207 |
||
| 15 | ALREADY_EXISTS_CODE = 405 |
||
| 16 | PRECONDITION_FAILED_CODE = 412 |
||
| 17 | |||
| 18 | COLLECTION_TYPE = 'collection' |
||
| 19 | |||
| 20 | def create_and_upload_file(self, file_name, file_content, timestamp=None): |
||
| 21 | with open(file_name, "w") as f: |
||
| 22 | f.write(file_content) |
||
| 23 | file_local_path = os.path.abspath(file_name) |
||
| 24 | return self.nxc_local.upload_file(self.user_username, file_local_path, file_name, timestamp) |
||
| 25 | |||
| 26 | def test_list_folders(self): |
||
| 27 | res = self.nxc_local.list_folders(self.user_username) |
||
| 28 | assert res.is_ok |
||
| 29 | assert isinstance(res.data, list) |
||
| 30 | assert isinstance(res.data[0], dict) |
||
| 31 | res = self.nxc_local.list_folders(self.user_username, all_properties=True) |
||
| 32 | assert res.is_ok |
||
| 33 | assert isinstance(res.data, list) |
||
| 34 | assert isinstance(res.data[0], dict) |
||
| 35 | |||
| 36 | def test_upload_download_file(self): |
||
| 37 | file_name = "test_file" |
||
| 38 | file_content = "test file content" |
||
| 39 | file_local_path = os.path.join(os.getcwd(), file_name) |
||
| 40 | res = self.create_and_upload_file(file_name, file_content) |
||
| 41 | # check status code |
||
| 42 | assert res.is_ok |
||
| 43 | assert res.raw.status_code == self.CREATED_CODE |
||
| 44 | |||
| 45 | # test uploaded file can be found with list_folders |
||
| 46 | file_nextcloud_href = os.path.join(WebDAV.API_URL, self.user_username, file_name) |
||
| 47 | folder_info = self.nxc_local.list_folders(self.user_username, path=file_name) |
||
| 48 | assert folder_info.is_ok |
||
| 49 | assert len(folder_info.data) == 1 |
||
| 50 | assert isinstance(folder_info.data[0], dict) |
||
| 51 | # check href |
||
| 52 | assert folder_info.data[0]['href'] == file_nextcloud_href |
||
| 53 | |||
| 54 | # remove file on local machine |
||
| 55 | os.remove(file_local_path) |
||
| 56 | self.nxc_local.download_file(self.user_username, file_name) |
||
| 57 | # test file is downloaded to current dir |
||
| 58 | assert file_name in os.listdir(".") |
||
| 59 | with open(file_local_path, 'r') as f: |
||
| 60 | downloaded_file_content = f.read() |
||
| 61 | assert downloaded_file_content == file_content |
||
| 62 | |||
| 63 | # delete file |
||
| 64 | self.nxc_local.delete_path(self.user_username, file_name) |
||
| 65 | os.remove(file_local_path) |
||
| 66 | |||
| 67 | def test_upload_download_file_with_timestamp(self): |
||
| 68 | file_name = "test_file_1579520460" |
||
| 69 | file_content = "Test file: Mon, 20 Jan 2020 20:41:00 GMT" |
||
| 70 | file_local_path = os.path.join(os.getcwd(), file_name) |
||
| 71 | |||
| 72 | # create test file, and upload it as timestamp: Mon, 20 Jan 2020 20:41:00 GMT |
||
| 73 | timestamp_str = "Mon, 20 Jan 2020 20:41:00 GMT" |
||
| 74 | timestamp = timestamp_to_epoch_time(timestamp_str) |
||
| 75 | assert timestamp == 1579552860 |
||
| 76 | res = self.create_and_upload_file(file_name, file_content, timestamp) |
||
| 77 | |||
| 78 | # check status code |
||
| 79 | assert res.is_ok |
||
| 80 | assert res.raw.status_code == self.CREATED_CODE |
||
| 81 | |||
| 82 | # test uploaded file can be found with list_folders |
||
| 83 | file_nextcloud_href = os.path.join(WebDAV.API_URL, self.user_username, file_name) |
||
| 84 | folder_info = self.nxc_local.list_folders(self.user_username, path=file_name) |
||
| 85 | |||
| 86 | assert folder_info.is_ok |
||
| 87 | assert len(folder_info.data) == 1 |
||
| 88 | assert isinstance(folder_info.data[0], dict) |
||
| 89 | # check href |
||
| 90 | assert folder_info.data[0]['href'] == file_nextcloud_href |
||
| 91 | # test timestamp of uploaded file in Nextcloud |
||
| 92 | assert folder_info.data[0]["last_modified"] == timestamp_str |
||
| 93 | |||
| 94 | # remove file on local machine |
||
| 95 | os.remove(file_local_path) |
||
| 96 | self.nxc_local.download_file(self.user_username, file_name) |
||
| 97 | |||
| 98 | # test file is downloaded to current dir |
||
| 99 | assert file_name in os.listdir(".") |
||
| 100 | with open(file_local_path, 'r') as f: |
||
| 101 | downloaded_file_content = f.read() |
||
| 102 | assert downloaded_file_content == file_content |
||
| 103 | |||
| 104 | # test timestamp of downloaded file |
||
| 105 | downloaded_file_timestamp = os.path.getmtime(file_local_path) |
||
| 106 | assert downloaded_file_timestamp == timestamp |
||
| 107 | |||
| 108 | # delete file |
||
| 109 | self.nxc_local.delete_path(self.user_username, file_name) |
||
| 110 | os.remove(file_local_path) |
||
| 111 | |||
| 112 | def test_upload_download_file_using_local_file_property(self): |
||
| 113 | file_name = "test_file_1000000000" |
||
| 114 | file_content = "Test file: Sun, 09 Sep 2001 01:46:40 GMT" |
||
| 115 | |||
| 116 | # create test file with timestamp: Sun, 09 Sep 2001 01:46:40 GMT |
||
| 117 | timestamp_str = "Sun, 09 Sep 2001 01:46:40 GMT" |
||
| 118 | timestamp = timestamp_to_epoch_time(timestamp_str) |
||
| 119 | assert timestamp == 1000000000 |
||
| 120 | |||
| 121 | # create test file |
||
| 122 | with open(file_name, "w") as f: |
||
| 123 | f.write(file_content) |
||
| 124 | file_local_path = os.path.abspath(file_name) |
||
| 125 | |||
| 126 | # set timestamp to saved test file |
||
| 127 | os.utime(file_local_path, (timestamp, timestamp)) |
||
| 128 | |||
| 129 | # upload test file, timestamp comes from local file's property |
||
| 130 | res = self.nxc_local.upload_file(self.user_username, file_local_path, file_name, timestamp=None) |
||
| 131 | |||
| 132 | # check status code |
||
| 133 | assert res.is_ok |
||
| 134 | assert res.raw.status_code == self.CREATED_CODE |
||
| 135 | |||
| 136 | # test uploaded file can be found with list_folders |
||
| 137 | file_nextcloud_href = os.path.join(WebDAV.API_URL, self.user_username, file_name) |
||
| 138 | folder_info = self.nxc_local.list_folders(self.user_username, path=file_name) |
||
| 139 | |||
| 140 | assert folder_info.is_ok |
||
| 141 | assert len(folder_info.data) == 1 |
||
| 142 | assert isinstance(folder_info.data[0], dict) |
||
| 143 | # check href |
||
| 144 | assert folder_info.data[0]['href'] == file_nextcloud_href |
||
| 145 | # test timestamp of uploaded file in Nextcloud |
||
| 146 | assert folder_info.data[0]["last_modified"] == timestamp_str |
||
| 147 | |||
| 148 | # remove file on local machine |
||
| 149 | os.remove(file_local_path) |
||
| 150 | self.nxc_local.download_file(self.user_username, file_name) |
||
| 151 | |||
| 152 | # test file is downloaded to current dir |
||
| 153 | assert file_name in os.listdir(".") |
||
| 154 | with open(file_local_path, 'r') as f: |
||
| 155 | downloaded_file_content = f.read() |
||
| 156 | assert downloaded_file_content == file_content |
||
| 157 | |||
| 158 | # test timestamp of downloaded file |
||
| 159 | downloaded_file_timestamp = os.path.getmtime(file_local_path) |
||
| 160 | assert downloaded_file_timestamp == timestamp |
||
| 161 | |||
| 162 | # delete file |
||
| 163 | self.nxc_local.delete_path(self.user_username, file_name) |
||
| 164 | os.remove(file_local_path) |
||
| 165 | |||
| 166 | |||
| 167 | def test_create_folder(self): |
||
| 168 | folder_name = "test folder5" |
||
| 169 | res = self.nxc_local.create_folder(self.user_username, folder_name) |
||
| 170 | assert res.is_ok |
||
| 171 | assert res.raw.status_code == self.CREATED_CODE |
||
| 172 | |||
| 173 | # test uploaded file can be found with list_folders |
||
| 174 | file_nextcloud_href = quote(os.path.join(WebDAV.API_URL, self.user_username, folder_name)) + "/" |
||
| 175 | folder_info = self.nxc_local.list_folders(self.user_username, path=folder_name) |
||
| 176 | assert folder_info.is_ok |
||
| 177 | assert len(folder_info.data) == 1 |
||
| 178 | assert isinstance(folder_info.data[0], dict) |
||
| 179 | # check href |
||
| 180 | assert folder_info.data[0]['href'] == file_nextcloud_href |
||
| 181 | # check that created file type is a collection |
||
| 182 | assert folder_info.data[0]['resource_type'] == self.COLLECTION_TYPE |
||
| 183 | |||
| 184 | nested_folder_name = "test folder5/nested/folder" |
||
| 185 | res = self.nxc_local.assure_tree_exists(self.user_username, nested_folder_name) |
||
| 186 | folder_info = self.nxc_local.list_folders(self.user_username, path=nested_folder_name) |
||
| 187 | assert folder_info.is_ok |
||
| 188 | assert len(folder_info.data) == 1 |
||
| 189 | |||
| 190 | # check 405 status code if location already exists |
||
| 191 | res = self.nxc_local.create_folder(self.user_username, folder_name) |
||
| 192 | assert not res.is_ok |
||
| 193 | assert res.raw.status_code == self.ALREADY_EXISTS_CODE |
||
| 194 | |||
| 195 | # delete folder |
||
| 196 | res = self.nxc_local.delete_path(self.user_username, folder_name) |
||
| 197 | assert res.is_ok |
||
| 198 | assert res.raw.status_code == self.NO_CONTENT_CODE |
||
| 199 | |||
| 200 | def test_delete_path(self): |
||
| 201 | # test delete empty folder |
||
| 202 | new_path_name = "path_to_delete" |
||
| 203 | self.nxc_local.create_folder(self.user_username, new_path_name) |
||
| 204 | res = self.nxc_local.delete_path(self.user_username, new_path_name) |
||
| 205 | assert res.raw.status_code == self.NO_CONTENT_CODE |
||
| 206 | assert res.is_ok |
||
| 207 | res = self.nxc_local.list_folders(self.user_username, new_path_name) |
||
| 208 | assert res.data is None |
||
| 209 | |||
| 210 | # test delete file |
||
| 211 | # create file at first |
||
| 212 | file_name = "test_file" |
||
| 213 | file_content = "test file content" |
||
| 214 | with open(file_name, "w") as f: |
||
| 215 | f.write(file_content) |
||
| 216 | file_local_path = os.path.abspath(file_name) |
||
| 217 | self.nxc_local.upload_file(self.user_username, file_local_path, file_name) |
||
| 218 | # delete file |
||
| 219 | res = self.nxc_local.delete_path(self.user_username, file_name) |
||
| 220 | assert res.raw.status_code == self.NO_CONTENT_CODE |
||
| 221 | assert res.is_ok |
||
| 222 | res = self.nxc_local.list_folders(self.user_username, new_path_name) |
||
| 223 | assert res.data is None |
||
| 224 | |||
| 225 | # test delete nonexistent file |
||
| 226 | res = self.nxc_local.delete_path(self.user_username, file_name) |
||
| 227 | assert res.raw.status_code == self.NOT_FOUND_CODE |
||
| 228 | assert not res.is_ok |
||
| 229 | |||
| 230 | View Code Duplication | def test_copy_path(self): |
|
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
| 231 | # create a file to copy |
||
| 232 | file_name = "test_file" |
||
| 233 | file_content = "test file content" |
||
| 234 | self.create_and_upload_file(file_name, file_content) |
||
| 235 | |||
| 236 | # copy file |
||
| 237 | destination_path = "new_test_file_location" |
||
| 238 | res = self.nxc_local.copy_path(self.user_username, file_name, destination_path) |
||
| 239 | assert res.raw.status_code == self.CREATED_CODE |
||
| 240 | assert res.is_ok |
||
| 241 | # check both file exist |
||
| 242 | original_file_props = self.nxc_local.list_folders(self.user_username, file_name) |
||
| 243 | copy_props = self.nxc_local.list_folders(self.user_username, destination_path) |
||
| 244 | assert len(original_file_props.data) == 1 |
||
| 245 | assert len(copy_props.data) == 1 |
||
| 246 | |||
| 247 | # copy file to already exist location |
||
| 248 | # create new file |
||
| 249 | new_file_name = 'test_file_2' |
||
| 250 | new_file_content = 'test_file_3' |
||
| 251 | self.create_and_upload_file(new_file_name, new_file_content) |
||
| 252 | res = self.nxc_local.copy_path(self.user_username, file_name, new_file_name) |
||
| 253 | assert not res.is_ok |
||
| 254 | assert res.raw.status_code == self.PRECONDITION_FAILED_CODE |
||
| 255 | # copy with overriding |
||
| 256 | res = self.nxc_local.copy_path(self.user_username, file_name, new_file_name, overwrite=True) |
||
| 257 | assert res.is_ok |
||
| 258 | assert res.raw.status_code == self.NO_CONTENT_CODE |
||
| 259 | |||
| 260 | # download just copied file and check content |
||
| 261 | os.remove(os.path.join(os.getcwd(), new_file_name)) # remove file locally to download it |
||
| 262 | self.nxc_local.download_file(self.user_username, new_file_name) |
||
| 263 | with open(new_file_name, 'r') as f: |
||
| 264 | downloaded_file_content = f.read() |
||
| 265 | assert downloaded_file_content == file_content |
||
| 266 | assert downloaded_file_content != new_file_content |
||
| 267 | |||
| 268 | View Code Duplication | def test_move_path(self): |
|
|
0 ignored issues
–
show
|
|||
| 269 | # create a file to move |
||
| 270 | file_name = "test_move_file 🇳🇴 😗 🇫🇴 🇦🇽" |
||
| 271 | file_content = "test move file content 🇳🇴 😗 🇫🇴 🇦🇽" |
||
| 272 | self.create_and_upload_file(file_name, file_content) |
||
| 273 | |||
| 274 | # move file |
||
| 275 | destination_path = "new_test_move_file_location 🇳🇴 😗 🇫🇴 🇦🇽" |
||
| 276 | res = self.nxc_local.move_path(self.user_username, file_name, destination_path) |
||
| 277 | assert res.is_ok |
||
| 278 | assert res.raw.status_code == self.CREATED_CODE |
||
| 279 | # check only new file exist |
||
| 280 | original_file_props = self.nxc_local.list_folders(self.user_username, file_name) |
||
| 281 | moved_file = self.nxc_local.list_folders(self.user_username, destination_path) |
||
| 282 | assert original_file_props.data is None |
||
| 283 | assert len(moved_file.data) == 1 |
||
| 284 | |||
| 285 | # copy file to already exist location |
||
| 286 | |||
| 287 | # create a file to move |
||
| 288 | file_name = "test_move_file 🇳🇴 😗 🇫🇴 🇦🇽" |
||
| 289 | file_content = "test move file content 🇳🇴 😗 🇫🇴 🇦🇽" |
||
| 290 | self.create_and_upload_file(file_name, file_content) |
||
| 291 | |||
| 292 | # create new file for conflict |
||
| 293 | new_file_name = 'test_move_file_ 🇳🇴 😗 🇫🇴 🇦🇽' |
||
| 294 | new_file_content = 'test_move_file_ 🇳🇴 😗 🇫🇴 🇦🇽' |
||
| 295 | self.create_and_upload_file(new_file_name, new_file_content) |
||
| 296 | |||
| 297 | # move file to the new file location |
||
| 298 | res = self.nxc_local.move_path(self.user_username, file_name, new_file_name) |
||
| 299 | assert not res.is_ok |
||
| 300 | assert res.raw.status_code == self.PRECONDITION_FAILED_CODE |
||
| 301 | # move with overriding |
||
| 302 | res = self.nxc_local.move_path(self.user_username, file_name, new_file_name, overwrite=True) |
||
| 303 | assert res.is_ok |
||
| 304 | assert res.raw.status_code == self.NO_CONTENT_CODE |
||
| 305 | |||
| 306 | # download just copied file and check content |
||
| 307 | os.remove(os.path.join(os.getcwd(), new_file_name)) # remove file locally to download it |
||
| 308 | self.nxc_local.download_file(self.user_username, new_file_name) |
||
| 309 | with open(new_file_name, 'r') as f: |
||
| 310 | downloaded_file_content = f.read() |
||
| 311 | assert downloaded_file_content == file_content |
||
| 312 | assert downloaded_file_content != new_file_content |
||
| 313 | |||
| 314 | def test_set_list_favorites(self): |
||
| 315 | # create new file to make favorite |
||
| 316 | file_name = "test_favorite" |
||
| 317 | file_content = "test favorite content" |
||
| 318 | self.create_and_upload_file(file_name, file_content) |
||
| 319 | file_nextcloud_href = os.path.join(WebDAV.API_URL, self.user_username, file_name) |
||
| 320 | |||
| 321 | # get favorites |
||
| 322 | res = self.nxc_local.list_favorites(self.user_username) |
||
| 323 | assert len(res.data) == 0 |
||
| 324 | |||
| 325 | # set file as favorite |
||
| 326 | res = self.nxc_local.set_favorites(self.user_username, file_name) |
||
| 327 | assert res.is_ok |
||
| 328 | assert res.raw.status_code == self.MULTISTATUS_CODE |
||
| 329 | |||
| 330 | # check file is in favorites |
||
| 331 | res = self.nxc_local.list_favorites(self.user_username) |
||
| 332 | assert res.is_ok |
||
| 333 | assert len(res.data) == 1 |
||
| 334 | assert res.data[0]['href'] == file_nextcloud_href |
||
| 335 | |||
| 336 | def test_timestamp_to_epoch_time(self): |
||
| 337 | timestamp_str = "Thu, 01 Dec 1994 16:00:00 GMT" |
||
| 338 | timestamp_unix_time = timestamp_to_epoch_time(timestamp_str) |
||
| 339 | assert timestamp_unix_time == 786297600 |
||
| 340 | assert timestamp_str == datetime.utcfromtimestamp(timestamp_unix_time).strftime('%a, %d %b %Y %H:%M:%S GMT') |
||
| 341 | |||
| 342 | timestamp_str = "Fri, 14 Jul 2017 02:40:00 GMT" |
||
| 343 | timestamp_unix_time = timestamp_to_epoch_time(timestamp_str) |
||
| 344 | assert timestamp_unix_time == 1500000000 |
||
| 345 | assert timestamp_str == datetime.utcfromtimestamp(timestamp_unix_time).strftime('%a, %d %b %Y %H:%M:%S GMT') |
||
| 346 | |||
| 347 | # UTM timezone is invalid for WebDav |
||
| 348 | timestamp_str = "Thu, 01 Dec 1994 16:00:00 UTM" |
||
| 349 | timestamp_unix_time = timestamp_to_epoch_time(timestamp_str) |
||
| 350 | assert timestamp_unix_time is None |
||
| 351 | assert timestamp_unix_time != 786297600 |
||
| 352 | |||
| 353 | # RFC 850 (part of http-date) format is invalid for WebDav |
||
| 354 | timestamp_str = "Sunday, 06-Nov-94 08:49:37 GMT" |
||
| 355 | timestamp_unix_time = timestamp_to_epoch_time(timestamp_str) |
||
| 356 | assert timestamp_unix_time is None |
||
| 357 | assert timestamp_unix_time != 784111777 |
||
| 358 | |||
| 359 | # ISO 8601 is invalid for WebDav |
||
| 360 | timestamp_str = "2007-03-01T13:00:00Z" |
||
| 361 | timestamp_unix_time = timestamp_to_epoch_time(timestamp_str) |
||
| 362 | assert timestamp_unix_time is None |
||
| 363 | assert timestamp_unix_time != 1172754000 |
||
| 364 | |||
| 365 | # broken date string |
||
| 366 | timestamp_str = " " |
||
| 367 | timestamp_unix_time = timestamp_to_epoch_time(timestamp_str) |
||
| 368 | assert timestamp_unix_time is None |
||
| 369 |