WorkspaceApi.save()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
import typing
3
4
from sqlalchemy.orm import Query
5
from sqlalchemy.orm import Session
6
from sqlalchemy.orm.exc import NoResultFound
7
8
from tracim_backend.config import CFG
9
from tracim_backend.exceptions import EmptyLabelNotAllowed
10
from tracim_backend.exceptions import WorkspaceLabelAlreadyUsed
11
from tracim_backend.exceptions import WorkspaceNotFound
12
from tracim_backend.lib.core.userworkspace import RoleApi
13
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
14
from tracim_backend.lib.utils.translation import Translator
15
from tracim_backend.models.auth import Group
16
from tracim_backend.models.auth import User
17
from tracim_backend.models.context_models import WorkspaceInContext
18
from tracim_backend.models.data import UserRoleInWorkspace
19
from tracim_backend.models.data import Workspace
20
21
__author__ = 'damien'
22
23
24
class WorkspaceApi(object):
25
26
    def __init__(
27
            self,
28
            session: Session,
29
            current_user: User,
30
            config: CFG,
31
            force_role: bool=False,
32
            show_deleted: bool=False,
33
34
    ):
35
        """
36
        :param current_user: Current user of context
37
        :param force_role: If True, app role in queries even if admin
38
        """
39
        self._session = session
40
        self._user = current_user
41
        self._config = config
42
        self._force_role = force_role
43
        self.show_deleted = show_deleted
44
        default_lang = None
45
        if self._user:
46
            default_lang = self._user.lang
47
        if not default_lang:
48
            default_lang = DEFAULT_FALLBACK_LANG
49
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
50
51
    def _base_query_without_roles(self):
52
        query = self._session.query(Workspace)
53
        if not self.show_deleted:
54
            query = query.filter(Workspace.is_deleted == False)
55
        return query
56
57
    def _base_query(self):
58
        if not self._force_role and self._user.profile.id>=Group.TIM_ADMIN:
59
            return self._base_query_without_roles()
60
61
        query = self._base_query_without_roles()
62
        query = query.join(Workspace.roles).\
63
            filter(UserRoleInWorkspace.user_id == self._user.user_id)
64
        return query
65
66
    def get_workspace_with_context(
67
            self,
68
            workspace: Workspace
69
    ) -> WorkspaceInContext:
70
        """
71
        Return WorkspaceInContext object from Workspace
72
        """
73
        workspace = WorkspaceInContext(
74
            workspace=workspace,
75
            dbsession=self._session,
76
            config=self._config,
77
        )
78
        return workspace
79
80
    def create_workspace(
81
            self,
82
            label: str='',
83
            description: str='',
84
            calendar_enabled: bool=False,
85
            save_now: bool=False,
86
    ) -> Workspace:
87
        if not label:
88
            raise EmptyLabelNotAllowed('Workspace label cannot be empty')
89
90
        if self._session.query(Workspace).filter(Workspace.label == label).count() > 0:  # nopep8
91
            raise WorkspaceLabelAlreadyUsed(
92
                'A workspace with label {} already exist.'.format(label)
93
            )
94
        workspace = Workspace()
95
        workspace.label = label
96
        workspace.description = description
97
        workspace.calendar_enabled = calendar_enabled
98
99
        # By default, we force the current user to be the workspace manager
100
        # And to receive email notifications
101
        role_api = RoleApi(
102
            session=self._session,
103
            current_user=self._user,
104
            config=self._config
105
        )
106
107
        role = role_api.create_one(
108
            self._user,
109
            workspace,
110
            UserRoleInWorkspace.WORKSPACE_MANAGER,
111
            with_notif=True,
112
        )
113
114
        self._session.add(workspace)
115
        self._session.add(role)
116
117
        if save_now:
118
            self._session.flush()
119
120
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable calendar stuff
121
        # if calendar_enabled:
122
        #     self._ensure_calendar_exist(workspace)
123
        # else:
124
        #     self._disable_calendar(workspace)
125
126
        return workspace
127
128
    def update_workspace(
129
            self,
130
            workspace: Workspace,
131
            label: str,
132
            description: str,
133
            save_now: bool=False,
134
    ) -> Workspace:
135
        """
136
        Update workspace
137
        :param workspace: workspace to update
138
        :param label: new label of workspace
139
        :param description: new description
140
        :param save_now: database flush
141
        :return: updated workspace
142
        """
143
        if not label:
144
            raise EmptyLabelNotAllowed('Workspace label cannot be empty')
145
        workspace.label = label
146
        workspace.description = description
147
148
        if save_now:
149
            self.save(workspace)
150
151
        return workspace
152
153
    def get_one(self, id):
154
        try:
155
            return self._base_query().filter(Workspace.workspace_id == id).one()
156
        except NoResultFound as exc:
157
            raise WorkspaceNotFound('workspace {} does not exist or not visible for user'.format(id)) from exc  # nopep8
158
159
    def get_one_by_label(self, label: str) -> Workspace:
160
        try:
161
            return self._base_query().filter(Workspace.label == label).one()
162
        except NoResultFound as exc:
163
            raise WorkspaceNotFound('workspace {} does not exist or not visible for user'.format(id)) from exc  # nopep8
164
165
    """
166
    def get_one_for_current_user(self, id):
167
        return self._base_query().filter(Workspace.workspace_id==id).\
168
            session.query(ZKContact).filter(ZKContact.groups.any(ZKGroup.id.in_([1,2,3])))
169
            filter(sqla.).one()
170
    """
171
172
    def get_all(self):
173
        return self._base_query().all()
174
175
    def get_all_for_user(self, user: User, ignored_ids=None):
176
        workspaces = []
177
178
        for role in user.roles:
179
            if not role.workspace.is_deleted:
180
                if not ignored_ids:
181
                    workspaces.append(role.workspace)
182
                elif role.workspace.workspace_id not in ignored_ids:
183
                        workspaces.append(role.workspace)
184
                else:
185
                    pass  # do not return workspace
186
187
        workspaces.sort(key=lambda workspace: workspace.label.lower())
188
        return workspaces
189
190
    def get_all_manageable(self) -> typing.List[Workspace]:
191
        """Get all workspaces the current user has manager rights on."""
192
        workspaces = []  # type: typing.List[Workspace]
193
        if self._user.profile.id == Group.TIM_ADMIN:
194
            workspaces = self._base_query().order_by(Workspace.label).all()
195
        elif self._user.profile.id == Group.TIM_MANAGER:
196
            workspaces = self._base_query() \
197
                .filter(
198
                    UserRoleInWorkspace.role ==
199
                    UserRoleInWorkspace.WORKSPACE_MANAGER
200
                ) \
201
                .order_by(Workspace.label) \
202
                .all()
203
        return workspaces
204
205
    def disable_notifications(self, user: User, workspace: Workspace):
206
        for role in user.roles:
207
            if role.workspace==workspace:
208
                role.do_notify = False
209
210
    def enable_notifications(self, user: User, workspace: Workspace):
211
        for role in user.roles:
212
            if role.workspace==workspace:
213
                role.do_notify = True
214
215
    def get_notifiable_roles(self, workspace: Workspace) -> [UserRoleInWorkspace]:
216
        roles = []
217
        for role in workspace.roles:
218
            if role.do_notify==True \
219
                    and role.user!=self._user \
220
                    and role.user.is_active:
221
                roles.append(role)
222
        return roles
223
224
    def save(self, workspace: Workspace):
225
        self._session.flush()
226
227
    def delete(self, workspace: Workspace, flush=True):
228
        workspace.is_deleted = True
229
230
        if flush:
231
            self._session.flush()
232
233
    def undelete(self, workspace: Workspace, flush=True):
234
        workspace.is_deleted = False
235
236
        if flush:
237
            self._session.flush()
238
239
        return workspace
240
241
    def execute_created_workspace_actions(self, workspace: Workspace) -> None:
242
        pass
243
        # TODO - G.M - 28-03-2018 - [Calendar] Re-enable this calendar stuff
244
        # self.ensure_calendar_exist(workspace)
245
246
    # TODO - G.M - 28-03-2018 - [Calendar] Re-enable this calendar stuff
247
    # def ensure_calendar_exist(self, workspace: Workspace) -> None:
248
    #     # Note: Cyclic imports
249
    #     from tracim.lib.calendar import CalendarManager
250
    #     from tracim.model.organisational import WorkspaceCalendar
251
    #
252
    #     calendar_manager = CalendarManager(self._user)
253
    #
254
    #     try:
255
    #         calendar_manager.enable_calendar_file(
256
    #             calendar_class=WorkspaceCalendar,
257
    #             related_object_id=workspace.workspace_id,
258
    #             raise_=True,
259
    #         )
260
    #     # If previous calendar file no exist, calendar must be created
261
    #     except FileNotFoundError:
262
    #         self._user.ensure_auth_token()
263
    #
264
    #         # Ensure database is up-to-date
265
    #         self.session.flush()
266
    #         transaction.commit()
267
    #
268
    #         calendar_manager.create_then_remove_fake_event(
269
    #             calendar_class=WorkspaceCalendar,
270
    #             related_object_id=workspace.workspace_id,
271
    #         )
272
    #
273
    # def disable_calendar(self, workspace: Workspace) -> None:
274
    #     # Note: Cyclic imports
275
    #     from tracim.lib.calendar import CalendarManager
276
    #     from tracim.model.organisational import WorkspaceCalendar
277
    #
278
    #     calendar_manager = CalendarManager(self._user)
279
    #     calendar_manager.disable_calendar_file(
280
    #         calendar_class=WorkspaceCalendar,
281
    #         related_object_id=workspace.workspace_id,
282
    #         raise_=False,
283
    #     )
284
285
    def get_base_query(self) -> Query:
286
        return self._base_query()
287
288
    def generate_label(self) -> str:
289
        """
290
        :return: Generated workspace label
291
        """
292
        _ = self.translator.get_translation
293
        query = self._base_query_without_roles() \
294
            .filter(Workspace.label.ilike('{0}%'.format(
295
                _('Workspace'),
296
            )))
297
298
        return _('Workspace {}').format(
299
            query.count() + 1,
300
        )
301
302
303
class UnsafeWorkspaceApi(WorkspaceApi):
304
    def _base_query(self):
305
        return self.session.query(Workspace).filter(Workspace.is_deleted==False)
306