Passed
Push — 2.x ( 2a1b20...d626d0 )
by Jordi
07:27
created

senaite.core.content.contact.Contact._linkUser()   B

Complexity

Conditions 7

Size

Total Lines 58
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 58
rs 7.76
c 0
b 0
f 0
cc 7
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import types
22
23
from AccessControl import ClassSecurityInfo
24
from Acquisition import aq_inner
25
from Acquisition import aq_parent
26
from bika.lims import logger
27
from bika.lims import senaiteMessageFactory as _
28
from bika.lims.api import get_path
29
from bika.lims.api import is_active
30
from bika.lims.api import search
31
from bika.lims.api import security as sec_api
32
from bika.lims.interfaces import IClient
33
from bika.lims.interfaces import IDeactivable
34
from plone import api
35
from plone.autoform import directives
36
from plone.supermodel import model
37
from Products.CMFCore import permissions
38
from senaite.core.catalog import CONTACT_CATALOG
39
from senaite.core.content.person import IPersonSchema
40
from senaite.core.content.person import Person
41
from senaite.core.interfaces import IContact
42
from senaite.core.schema import UIDReferenceField
43
from senaite.core.z3cform.widgets.uidreference import UIDReferenceWidgetFactory
44
from zope.interface import implementer
45
46
CONTACT_UID_KEY = "linked_contact_uid"
47
48
49
class IContactSchema(IPersonSchema):
50
    """Contact Schema extending Person
51
    """
52
53
    # Publication preference
54
    model.fieldset(
55
        "publication_preference",
56
        label=_(
57
            u"label_publication_preference",
58
            default=u"Publication preference"
59
        ),
60
        fields=[
61
            "cc_contact",
62
        ]
63
    )
64
65
    directives.widget(
66
        "cc_contact",
67
        UIDReferenceWidgetFactory,
68
        catalog=CONTACT_CATALOG,
69
        query="get_widget_cccontact_query",
70
        columns=[
71
            {"name": "getFullname", "label": _("Name")},
72
            {"name": "getEmailAddress", "label": _("Email")},
73
        ],
74
    )
75
    cc_contact = UIDReferenceField(
76
        title=_(
77
            u"label_contact_cccontact",
78
            default=u"Contacts to CC"
79
        ),
80
        description=_(
81
            u"description_contact_cccontact",
82
            default=u"Contacts in CC for new samples"
83
        ),
84
        allowed_types=("Contact",),
85
        multi_valued=True,
86
        required=False,
87
    )
88
89
90
@implementer(IContact, IContactSchema, IDeactivable)
91
class Contact(Person):
92
    """A Contact of a Client which can be linked to a System User
93
    """
94
    # Catalogs where this type will be catalogued
95
    _catalogs = [CONTACT_CATALOG]
96
97
    security = ClassSecurityInfo()
98
99 View Code Duplication
    def get_widget_cccontact_query(self, **kw):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
100
        """Return the query for the CCContact field
101
        """
102
        path = get_path(self.aq_parent)
103
        query = {
104
            "portal_type": "Contact",
105
            "path": {"query": path, "depth": 1},
106
            "is_active": True,
107
            "sort_on": "sortable_title",
108
            "sort_order": "ascending",
109
        }
110
        logger.info("get_widget_contact_query: %r" % query)
111
        return query
112
113
    @classmethod
114
    def getContactByUsername(cls, username):
115
        """Convenience Classmethod which returns a Contact by a Username
116
117
        NOTE: This method used the contact catalog to search for Contacts by the
118
        index `getUsername`.
119
        """
120
        # Check if the User is linked already
121
        query = {"portal_type": cls.__name__, "getUsername": username}
122
        contacts = search(query, CONTACT_CATALOG)
123
124
        # No Contact assigned to this username
125
        if len(contacts) == 0:
126
            return None
127
128
        # Multiple Users assigned, this should never happen
129
        if len(contacts) > 1:
130
            logger.error("User '{}' is bound to multiple Contacts '{}'".format(
131
                username, ",".join(map(lambda c: c.Title, contacts))))
132
            return map(lambda x: x.getObject(), contacts)
133
134
        # Return the found Contact object
135
        return contacts[0].getObject()
136
137
    def isActive(self):
138
        """Checks if the Contact is active
139
        """
140
        return is_active(self)
141
142
    @security.protected(permissions.ModifyPortalContent)
143
    def getUser(self):
144
        """Returns the linked Plone User or None
145
        """
146
        username = self.getUsername()
147
        if not username:
148
            return None
149
        user = api.user.get(userid=username)
150
        return user
151
152 View Code Duplication
    @security.protected(permissions.ModifyPortalContent)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
153
    def setUser(self, user_or_username):
154
        """Link the user to the Contact
155
156
        :returns: True if OK, False if the User could not be linked
157
        :rtype: bool
158
        """
159
        user = None
160
        userid = None
161
162
        # Handle User IDs (strings)
163
        if isinstance(user_or_username, types.StringTypes):
164
            userid = user_or_username
165
            user = api.user.get(userid)
166
        # Handle User Objects (MemberData/PloneUser)
167
        if hasattr(user_or_username, "getId"):
168
            userid = user_or_username.getId()
169
            user = user_or_username
170
171
        # Not a valid user
172
        if user is None:
173
            return False
174
175
        # Link the User
176
        return self._linkUser(user)
177
178 View Code Duplication
    @security.protected(permissions.ModifyPortalContent)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
179
    def unlinkUser(self, delete=False):
180
        """Unlink the user to the Contact
181
182
        :returns: True if OK, False if no User was unlinked
183
        :rtype: bool
184
        """
185
        userid = self.getUsername()
186
        user = self.getUser()
187
        if user:
188
            logger.debug("Unlinking User '{}' from Contact '{}'".format(
189
                userid, self.Title()))
190
191
            # Unlink the User
192
            if not self._unlinkUser():
193
                return False
194
195
            # Also remove the Plone User (caution)
196
            if delete:
197
                logger.debug("Removing Plone User '{}'".format(userid))
198
                api.user.delete(username=userid)
199
200
            return True
201
        return False
202
203
    @security.protected(permissions.ModifyPortalContent)
204
    def hasUser(self):
205
        """Check if Contact has a linked a System User
206
        """
207
        user = self.getUser()
208
        if user is None:
209
            return False
210
        return True
211
212
    def getParentUID(self):
213
        return self.aq_parent.UID()
214
215
    def getParent(self):
216
        return aq_parent(aq_inner(self))
217
218
    def isGlobal(self):
219
        """Check if Contact is global (not under a Client)
220
        """
221
        parent = self.getParent()
222
        if IClient.providedBy(parent):
223
            return False
224
        return True
225
226
    @security.private
227
    def _linkUser(self, user):
228
        """Set the UID of the current Contact in the User properties and update
229
        all relevant own properties.
230
        """
231
        username = user.getId()
232
        contact = self.getContactByUsername(username)
233
234
        # User is linked to another contact (fix in UI)
235
        if contact and contact.UID() != self.UID():
236
            raise ValueError("User '{}' is already linked to Contact '{}'"
237
                             .format(username, contact.Title()))
238
239
        # User is linked to multiple other contacts (fix in Data)
240
        if isinstance(contact, list):
241
            raise ValueError("User '{}' is linked to multiple Contacts: '{}'"
242
                             .format(username, ",".join(
243
                                 map(lambda x: x.Title(), contact))))
244
245
        # Linked Contact UID is used in member profile as backreference
246
        try:
247
            user.getProperty(CONTACT_UID_KEY)
248
        except ValueError:
249
            logger.info("Adding User property {}".format(CONTACT_UID_KEY))
250
            user._tool.manage_addProperty(CONTACT_UID_KEY, "", "string")
251
252
        # Set the UID as a User Property
253
        uid = self.UID()
254
        user.setMemberProperties({CONTACT_UID_KEY: uid})
255
        logger.info("Linked Contact UID {} to User {}".format(
256
            user.getProperty(CONTACT_UID_KEY), username))
257
258
        # Set the Username
259
        self.setUsername(user.getId())
260
261
        # Update the Email address from the user
262
        self.setEmailAddress(user.getProperty("email"))
263
264
        # set the Fullname of the User
265
        user.setProperties(fullname=self.Title())
266
267
        # grant the owner role
268
        sec_api.grant_local_roles_for(self, roles=["Owner"], user=user)
269
270
        # Allow modificiations
271
        sec_api.grant_permission_for(self, permissions.ModifyPortalContent,
272
                                     ["Owner"], acquire=True)
273
274
        # somehow the `getUsername` index gets out of sync
275
        self.reindexObject()
276
277
        # N.B. Local owner role and client group applies only to client
278
        #      contacts, but not lab contacts.
279
        if IClient.providedBy(self.aq_parent):
280
            # add user to clients group
281
            self.aq_parent.add_user_to_group(username)
282
283
        return True
284
285
    @security.private
286
    def _unlinkUser(self):
287
        """Remove the UID of the current Contact in the User properties and
288
        update all relevant own properties.
289
        """
290
        # Nothing to do if no user is linked
291
        if not self.hasUser():
292
            return False
293
294
        user = self.getUser()
295
        username = user.getId()
296
297
        # Unset the UID from the User Property
298
        user.setMemberProperties({CONTACT_UID_KEY: ""})
299
        logger.info("Unlinked Contact UID from User {}"
300
                    .format(user.getProperty(CONTACT_UID_KEY, "")))
301
302
        # Unset the Username
303
        self.setUsername("")
304
305
        # Unset the Email
306
        self.setEmailAddress("")
307
308
        # revoke the owner role
309
        sec_api.revoke_local_roles_for(self, roles=["Owner"], user=user)
310
311
        # Disallow modificiations
312
        sec_api.revoke_permission_for(self, permissions.ModifyPortalContent,
313
                                      ["Owner"], acquire=True)
314
315
        # somehow the `getUsername` index gets out of sync
316
        self.reindexObject()
317
318
        # N.B. Local owner role and client group applies only to client
319
        #      contacts, but not lab contacts.
320
        if IClient.providedBy(self.aq_parent):
321
            # remove user from clients group
322
            self.aq_parent.del_user_from_group(username)
323
324
        return True
325
326
    @security.protected(permissions.View)
327
    def getRawCCContact(self):
328
        accessor = self.accessor("cc_contact", raw=True)
329
        return accessor(self)
330
331
    @security.protected(permissions.View)
332
    def getCCContact(self):
333
        accessor = self.accessor("cc_contact")
334
        return accessor(self)
335
336
    @security.protected(permissions.ModifyPortalContent)
337
    def setCCContact(self, value):
338
        mutator = self.mutator("cc_contact")
339
        mutator(self, value)
340
341
    # BBB: AT schema field property
342
    CCContact = property(getCCContact, setCCContact)
343