Passed
Push — 2.x ( 47fbd3...321cdf )
by Jordi
10:23
created

bika.lims.content.contact.Contact.getParent()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import types
22
23
from AccessControl import ClassSecurityInfo
24
from Acquisition import aq_base
25
from Acquisition import aq_inner
26
from Acquisition import aq_parent
27
from bika.lims import bikaMessageFactory as _
28
from bika.lims import logger
29
from bika.lims.api import is_active
30
from bika.lims.config import PROJECTNAME
31
from bika.lims.content.person import Person
32
from bika.lims.interfaces import IClient
33
from bika.lims.interfaces import IContact
34
from bika.lims.interfaces import IDeactivable
35
from plone import api
36
from Products.Archetypes import atapi
37
from Products.Archetypes.utils import DisplayList
38
from Products.CMFCore.permissions import ModifyPortalContent
39
from Products.CMFPlone.utils import safe_unicode
40
from senaite.core.p3compat import cmp
41
from zope.interface import implements
42
43
ACTIVE_STATES = ["active"]
44
45
46
schema = Person.schema.copy() + atapi.Schema((
47
    atapi.ReferenceField('CCContact',
48
                         schemata='Publication preference',
49
                         vocabulary='getContacts',
50
                         multiValued=1,
51
                         allowed_types=('Contact',),
52
                         relationship='ContactContact',
53
                         widget=atapi.ReferenceWidget(
54
                             checkbox_bound=0,
55
                             label=_("Contacts to CC"),
56
                         )),
57
))
58
59
60
schema['JobTitle'].schemata = 'default'
61
schema['Department'].schemata = 'default'
62
# Don't make title required - it will be computed from the Person's Fullname
63
schema['title'].required = 0
64
schema['title'].widget.visible = False
65
66
67
class Contact(Person):
68
    """A Contact of a Client which can be linked to a System User
69
    """
70
    implements(IContact, IDeactivable)
71
72
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
73
    displayContentsTab = False
74
    security = ClassSecurityInfo()
75
    _at_rename_after_creation = True
76
77
    @classmethod
78
    def getContactByUsername(cls, username):
79
        """Convenience Classmethod which returns a Contact by a Username
80
        """
81
82
        # Check if the User is linked already
83
        pc = api.portal.get_tool("portal_catalog")
84
        contacts = pc(portal_type=cls.portal_type,
85
                      getUsername=username)
86
87
        # No Contact assigned to this username
88
        if len(contacts) == 0:
89
            return None
90
91
        # Multiple Users assigned, this should never happen
92
        if len(contacts) > 1:
93
            logger.error("User '{}' is bound to multiple Contacts '{}'".format(
94
                username, ",".join(map(lambda c: c.Title, contacts))))
95
            return map(lambda x: x.getObject(), contacts)
96
97
        # Return the found Contact object
98
        return contacts[0].getObject()
99
100
    def Title(self):
101
        """Return the contact's Fullname as title
102
        """
103
        return safe_unicode(self.getFullname()).encode('utf-8')
104
105
    def isActive(self):
106
        """Checks if the Contact is active
107
        """
108
        return is_active(self)
109
110
    @security.protected(ModifyPortalContent)
111
    def getUser(self):
112
        """Returns the linked Plone User or None
113
        """
114
        username = self.getUsername()
115
        if not username:
116
            return None
117
        user = api.user.get(userid=username)
118
        return user
119
120
    @security.protected(ModifyPortalContent)
121
    def setUser(self, user_or_username):
122
        """Link the user to the Contact
123
124
        :returns: True if OK, False if the User could not be linked
125
        :rtype: bool
126
        """
127
        user = None
128
        userid = None
129
130
        # Handle User IDs (strings)
131
        if isinstance(user_or_username, types.StringTypes):
132
            userid = user_or_username
133
            user = api.user.get(userid)
134
        # Handle User Objects (MemberData/PloneUser)
135
        if hasattr(user_or_username, "getId"):
136
            userid = user_or_username.getId()
137
            user = user_or_username
138
139
        # Not a valid user
140
        if user is None:
141
            return False
142
143
        # Link the User
144
        return self._linkUser(user)
145
146
    @security.protected(ModifyPortalContent)
147
    def unlinkUser(self, delete=False):
148
        """Unlink the user to the Contact
149
150
        :returns: True if OK, False if no User was unlinked
151
        :rtype: bool
152
        """
153
        userid = self.getUsername()
154
        user = self.getUser()
155
        if user:
156
            logger.debug("Unlinking User '{}' from Contact '{}'".format(
157
                userid, self.Title()))
158
159
            # Unlink the User
160
            if not self._unlinkUser():
161
                return False
162
163
            # Also remove the Plone User (caution)
164
            if delete:
165
                logger.debug("Removing Plone User '{}'".format(userid))
166
                api.user.delete(username=userid)
167
168
            return True
169
        return False
170
171
    @security.protected(ModifyPortalContent)
172
    def hasUser(self):
173
        """Check if Contact has a linked a System User
174
        """
175
        user = self.getUser()
176
        if user is None:
177
            return False
178
        return True
179
180
    def getContacts(self, dl=True):
181
        pairs = []
182
        objects = []
183
        for contact in self.aq_parent.objectValues('Contact'):
184
            if is_active(contact) and contact.UID() != self.UID():
185
                pairs.append((contact.UID(), contact.Title()))
186
                if not dl:
187
                    objects.append(contact)
188
        pairs.sort(lambda x, y: cmp(x[1].lower(), y[1].lower()))
189
        return dl and DisplayList(pairs) or objects
190
191
    def getParentUID(self):
192
        return self.aq_parent.UID()
193
194
    def getParent(self):
195
        return aq_parent(aq_inner(self))
196
197
    def _renameAfterCreation(self, check_auto_id=False):
198
        from bika.lims.idserver import renameAfterCreation
199
        renameAfterCreation(self)
200
201
    @security.private
202
    def _linkUser(self, user):
203
        """Set the UID of the current Contact in the User properties and update
204
        all relevant own properties.
205
        """
206
        KEY = "linked_contact_uid"
207
208
        username = user.getId()
209
        contact = self.getContactByUsername(username)
210
211
        # User is linked to another contact (fix in UI)
212
        if contact and contact.UID() != self.UID():
213
            raise ValueError("User '{}' is already linked to Contact '{}'"
214
                             .format(username, contact.Title()))
215
216
        # User is linked to multiple other contacts (fix in Data)
217
        if isinstance(contact, list):
218
            raise ValueError("User '{}' is linked to multiple Contacts: '{}'"
219
                             .format(username, ",".join(
220
                                 map(lambda x: x.Title(), contact))))
221
222
        # XXX: Does it make sense to "remember" the UID as a User property?
223
        try:
224
            user.getProperty(KEY)
225
        except ValueError:
226
            logger.info("Adding User property {}".format(KEY))
227
            user._tool.manage_addProperty(KEY, "", "string")
228
229
        # Set the UID as a User Property
230
        uid = self.UID()
231
        user.setMemberProperties({KEY: uid})
232
        logger.info("Linked Contact UID {} to User {}".format(
233
            user.getProperty(KEY), username))
234
235
        # Set the Username
236
        self.setUsername(user.getId())
237
238
        # Update the Email address from the user
239
        self.setEmailAddress(user.getProperty("email"))
240
241
        # somehow the `getUsername` index gets out of sync
242
        self.reindexObject()
243
244
        # N.B. Local owner role and client group applies only to client
245
        #      contacts, but not lab contacts.
246
        if IClient.providedBy(self.aq_parent):
247
            # Add user to "Clients" group
248
            self._addUserToGroup(username, group="Clients")
249
            self._recursive_reindex_object(self.aq_parent)
250
251
        return True
252
253
    @security.private
254
    def _unlinkUser(self):
255
        """Remove the UID of the current Contact in the User properties and
256
        update all relevant own properties.
257
        """
258
        KEY = "linked_contact_uid"
259
260
        # Nothing to do if no user is linked
261
        if not self.hasUser():
262
            return False
263
264
        user = self.getUser()
265
        username = user.getId()
266
267
        # Unset the UID from the User Property
268
        user.setMemberProperties({KEY: ""})
269
        logger.info("Unlinked Contact UID from User {}"
270
                    .format(user.getProperty(KEY, "")))
271
272
        # Unset the Username
273
        self.setUsername("")
274
275
        # Unset the Email
276
        self.setEmailAddress("")
277
278
        # somehow the `getUsername` index gets out of sync
279
        self.reindexObject()
280
281
        # N.B. Local owner role and client group applies only to client
282
        #      contacts, but not lab contacts.
283
        if IClient.providedBy(self.aq_parent):
284
            # Remove user from "Clients" group
285
            self._delUserFromGroup(username, group="Clients")
286
            self._recursive_reindex_object(self.aq_parent)
287
288
        return True
289
290
    @security.private
291
    def _addUserToGroup(self, username, group="Clients"):
292
        """Add user to the goup
293
        """
294
        portal_groups = api.portal.get_tool("portal_groups")
295
        group = portal_groups.getGroupById(group)
296
        group.addMember(username)
297
298
    @security.private
299
    def _delUserFromGroup(self, username, group="Clients"):
300
        """Remove user from the group
301
        """
302
        portal_groups = api.portal.get_tool("portal_groups")
303
        group = portal_groups.getGroupById(group)
304
        group.removeMember(username)
305
306
    def _recursive_reindex_object(self, obj):
307
        """Reindex object after user linking
308
        """
309
        if hasattr(aq_base(obj), "objectValues"):
310
            for child_obj in obj.objectValues():
311
                self._recursive_reindex_object(child_obj)
312
313
        logger.debug("Reindexing object {}".format(repr(obj)))
314
        obj.reindexObject(idxs=["allowedRolesAndUsers"])
315
316
317
atapi.registerType(Contact, PROJECTNAME)
318