Provider.getResourceInst()   F
last analyzed

Complexity

Conditions 21

Size

Total Lines 165
Code Lines 119

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 119
dl 0
loc 165
rs 0
c 0
b 0
f 0
cc 21
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like backend.tracim_backend.lib.webdav.dav_provider.Provider.getResourceInst() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding: utf8
2
3
import re
4
from os.path import basename, dirname
5
6
from sqlalchemy.orm.exc import NoResultFound
7
8
from tracim_backend import CFG
9
from tracim_backend.lib.webdav.utils import transform_to_bdd, HistoryType, \
10
    SpecialFolderExtension
11
from tracim_backend.app_models.contents import content_type_list
12
13
from wsgidav.dav_provider import DAVProvider
14
from wsgidav.lock_manager import LockManager
15
16
17
from tracim_backend.lib.webdav.lock_storage import LockStorage
18
from tracim_backend.lib.core.content import ContentApi
19
from tracim_backend.lib.core.content import ContentRevisionRO
20
from tracim_backend.lib.core.workspace import WorkspaceApi
21
from tracim_backend.lib.webdav import resources
22
from tracim_backend.lib.webdav.utils import normpath
23
from tracim_backend.models.data import Content
24
from tracim_backend.models.data import Workspace
25
26
27
class Provider(DAVProvider):
28
    """
29
    This class' role is to provide to wsgidav _DAVResource. Wsgidav will then use them to execute action and send
30
    informations to the client
31
    """
32
33
    def __init__(
34
            self,
35
            app_config: CFG,
36
            show_history=True,
37
            show_deleted=True,
38
            show_archived=True,
39
            manage_locks=True,
40
    ):
41
        super(Provider, self).__init__()
42
43
        if manage_locks:
44
            self.lockManager = LockManager(LockStorage())
45
46
        self.app_config = app_config
47
        self._show_archive = show_archived
48
        self._show_delete = show_deleted
49
        self._show_history = show_history
50
51
    def show_history(self):
52
        return self._show_history
53
54
    def show_delete(self):
55
        return self._show_delete
56
57
    def show_archive(self):
58
        return self._show_archive
59
60
    #########################################################
61
    # Everything override from DAVProvider
62
    def getResourceInst(self, path: str, environ: dict):
63
        """
64
        Called by wsgidav whenever a request is called to get the _DAVResource corresponding to the path
65
        """
66
        user = environ['tracim_user']
67
        session = environ['tracim_dbsession']
68
        if not self.exists(path, environ):
69
            return None
70
        path = normpath(path)
71
        root_path = environ['http_authenticator.realm']
72
73
        # If the requested path is the root, then we return a RootResource resource
74
        if path == root_path:
75
            return resources.RootResource(
76
                path=path,
77
                environ=environ,
78
                user=user,
79
                session=session
80
            )
81
82
        workspace_api = WorkspaceApi(
83
            current_user=user,
84
            session=session,
85
            config=self.app_config,
86
        )
87
        workspace = self.get_workspace_from_path(path, workspace_api)
88
89
        # If the request path is in the form root/name, then we return a WorkspaceResource resource
90
        parent_path = dirname(path)
91
        if parent_path == root_path:
92
            if not workspace:
93
                return None
94
            return resources.WorkspaceResource(
95
                path=path,
96
                environ=environ,
97
                workspace=workspace,
98
                user=user,
99
                session=session,
100
            )
101
102
        # And now we'll work on the path to establish which type or resource is requested
103
104
        content_api = ContentApi(
105
            current_user=user,
106
            session=session,
107
            config=self.app_config,
108
            show_archived=False,  # self._show_archive,
109
            show_deleted=False,  # self._show_delete
110
        )
111
112
        content = self.get_content_from_path(
113
            path=path,
114
            content_api=content_api,
115
            workspace=workspace
116
        )
117
118
119
        # Easy cases : path either end with /.deleted, /.archived or /.history, then we return corresponding resources
120
        if path.endswith(SpecialFolderExtension.Archived) and self._show_archive:
121
            return resources.ArchivedFolderResource(
122
                path=path,
123
                environ=environ,
124
                workspace=workspace,
125
                user=user,
126
                content=content,
127
                session=session,
128
            )
129
130
        if path.endswith(SpecialFolderExtension.Deleted) and self._show_delete:
131
            return resources.DeletedFolderResource(
132
                path=path,
133
                environ=environ,
134
                workspace=workspace,
135
                user=user,
136
                content=content,
137
                session=session,
138
            )
139
140
        if path.endswith(SpecialFolderExtension.History) and self._show_history:
141
            is_deleted_folder = re.search(r'/\.deleted/\.history$', path) is not None
142
            is_archived_folder = re.search(r'/\.archived/\.history$', path) is not None
143
144
            type = HistoryType.Deleted if is_deleted_folder \
145
                else HistoryType.Archived if is_archived_folder \
146
                else HistoryType.Standard
147
148
            return resources.HistoryFolderResource(
149
                path=path,
150
                environ=environ,
151
                workspace=workspace,
152
                user=user,
153
                content=content,
154
                session=session,
155
                type=type
156
            )
157
158
        # Now that's more complicated, we're trying to find out if the path end with /.history/file_name
159
        is_history_file_folder = re.search(r'/\.history/([^/]+)$', path) is not None
160
161
        if is_history_file_folder and self._show_history:
162
            return resources.HistoryFileFolderResource(
163
                path=path,
164
                environ=environ,
165
                user=user,
166
                content=content,
167
                session=session,
168
            )
169
        # And here next step :
170
        is_history_file = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) .+', path) is not None
171
172
        if self._show_history and is_history_file:
173
174
            revision_id = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) ([^/].+)$', path).group(1)
175
176
            content_revision = content_api.get_one_revision(revision_id)
177
            content = self.get_content_from_revision(content_revision, content_api)
178
179
            if content.type == content_type_list.File.slug:
180
                return resources.HistoryFileResource(
181
                    path=path,
182
                    environ=environ,
183
                    user=user,
184
                    content=content,
185
                    content_revision=content_revision,
186
                    session=session,
187
                )
188
            else:
189
                return resources.HistoryOtherFile(
190
                    path=path,
191
                    environ=environ,
192
                    user=user,
193
                    content=content,
194
                    content_revision=content_revision,
195
                    session=session,
196
                )
197
198
        # And if we're still going, the client is asking for a standard Folder/File/Page/Thread so we check the type7
199
        # and return the corresponding resource
200
201
        if content is None:
202
            return None
203
        if content.type == content_type_list.Folder.slug:
204
            return resources.FolderResource(
205
                path=path,
206
                environ=environ,
207
                workspace=content.workspace,
208
                content=content,
209
                session=session,
210
                user=user,
211
            )
212
        elif content.type == content_type_list.File.slug:
213
            return resources.FileResource(
214
                path=path,
215
                environ=environ,
216
                content=content,
217
                session=session,
218
                user=user
219
            )
220
        else:
221
            return resources.OtherFileResource(
222
                path=path,
223
                environ=environ,
224
                content=content,
225
                session=session,
226
                user=user,
227
            )
228
229
    def exists(self, path, environ) -> bool:
230
        """
231
        Called by wsgidav to check if a certain path is linked to a _DAVResource
232
        """
233
234
        path = normpath(path)
235
        working_path = self.reduce_path(path)
236
        root_path = environ['http_authenticator.realm']
237
        parent_path = dirname(working_path)
238
        user = environ['tracim_user']
239
        session = environ['tracim_dbsession']
240
        if path == root_path:
241
            return True
242
243
        workspace = self.get_workspace_from_path(
244
            path,
245
            WorkspaceApi(
246
                current_user=user,
247
                session=session,
248
                config=self.app_config,
249
            )
250
        )
251
252
        if parent_path == root_path or workspace is None:
253
            return workspace is not None
254
255
        # TODO bastien: Arnaud avait mis a True, verif le comportement
256
        # lorsque l'on explore les dossiers archive et deleted
257
        content_api = ContentApi(
258
            current_user=user,
259
            session=session,
260
            config=self.app_config,
261
            show_archived=False,
262
            show_deleted=False
263
        )
264
265
        revision_id = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) ([^/].+)$', path)
266
267
        is_archived = self.is_path_archive(path)
268
269
        is_deleted = self.is_path_delete(path)
270
271
        if revision_id:
272
            revision_id = revision_id.group(1)
273
            content = content_api.get_one_revision(revision_id)
274
        else:
275
            content = self.get_content_from_path(working_path, content_api, workspace)
276
277
        return content is not None \
278
            and content.is_deleted == is_deleted \
279
            and content.is_archived == is_archived
280
281
    def is_path_archive(self, path):
282
        """
283
        This function will check if a given path is linked to a file that's archived or not. We're checking if the
284
        given path end with one of these string :
285
286
        ex:
287
            - /a/b/.archived/my_file
288
            - /a/b/.archived/.history/my_file
289
            - /a/b/.archived/.history/my_file/(3615 - edition) my_file
290
        """
291
292
        return re.search(
293
            r'/\.archived/(\.history/)?(?!\.history)[^/]*(/\.)?(history|deleted|archived)?$',
294
            path
295
        ) is not None
296
297
    def is_path_delete(self, path):
298
        """
299
        This function will check if a given path is linked to a file that's deleted or not. We're checking if the
300
        given path end with one of these string :
301
302
        ex:
303
            - /a/b/.deleted/my_file
304
            - /a/b/.deleted/.history/my_file
305
            - /a/b/.deleted/.history/my_file/(3615 - edition) my_file
306
        """
307
308
        return re.search(
309
            r'/\.deleted/(\.history/)?(?!\.history)[^/]*(/\.)?(history|deleted|archived)?$',
310
            path
311
        ) is not None
312
313
    def reduce_path(self, path: str) -> str:
314
        """
315
        As we use the given path to request the database
316
317
        ex: if the path is /a/b/.deleted/c/.archived, we're trying to get the archived content of the 'c' resource,
318
        we need to keep the path /a/b/c
319
320
        ex: if the path is /a/b/.history/my_file, we're trying to get the history of the file my_file, thus we need
321
        the path /a/b/my_file
322
323
        ex: if the path is /a/b/.history/my_file/(1985 - edition) my_old_name, we're looking for,
324
        thus we remove all useless information
325
        """
326
        path = re.sub(r'/\.archived', r'', path)
327
        path = re.sub(r'/\.deleted', r'', path)
328
        path = re.sub(r'/\.history/[^/]+/(\d+)-.+', r'/\1', path)
329
        path = re.sub(r'/\.history/([^/]+)', r'/\1', path)
330
        path = re.sub(r'/\.history', r'', path)
331
332
        return path
333
334
    def get_content_from_path(self, path, content_api: ContentApi, workspace: Workspace) -> Content:
335
        """
336
        Called whenever we want to get the Content item from the database for a given path
337
        """
338
        path = self.reduce_path(path)
339
        parent_path = dirname(path)
340
341
        relative_parents_path = parent_path[len(workspace.label)+1:]
342
        parents = relative_parents_path.split('/')
343
344
        try:
345
            parents.remove('')
346
        except ValueError:
347
            pass
348
        parents = [transform_to_bdd(x) for x in parents]
349
350
        try:
351
            return content_api.get_one_by_label_and_parent_labels(
352
                content_label=transform_to_bdd(basename(path)),
353
                content_parent_labels=parents,
354
                workspace=workspace,
355
            )
356
        except NoResultFound:
357
            return None
358
359
    def get_content_from_revision(self, revision: ContentRevisionRO, api: ContentApi) -> Content:
360
        try:
361
            return api.get_one(revision.content_id, content_type_list.Any_SLUG)
362
        except NoResultFound:
363
            return None
364
365
    def get_parent_from_path(self, path, api: ContentApi, workspace) -> Content:
366
        return self.get_content_from_path(dirname(path), api, workspace)
367
368
    def get_workspace_from_path(self, path: str, api: WorkspaceApi) -> Workspace:
369
        try:
370
            return api.get_one_by_label(transform_to_bdd(path.split('/')[1]))
371
        except NoResultFound:
372
            return None
373