senaite.core.exportimport.genericsetup.structure   F
last analyzed

Complexity

Total Complexity 101

Size/Duplication

Total Lines 600
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 101
eloc 379
dl 0
loc 600
rs 2
c 0
b 0
f 0

10 Functions

Rating   Name   Duplication   Size   Complexity  
A import_xml() 0 21 4
A export_xml() 0 3 1
B exportObjects() 0 29 8
A get_id() 0 7 2
B importObjects() 0 29 8
A can_import() 0 6 2
B create_content_slugs() 0 52 6
A can_export() 0 8 2
B create_or_get() 0 52 8
A can_export_type() 0 7 2

23 Methods

Rating   Name   Duplication   Size   Complexity  
A SenaiteSiteXMLAdapter._importNode() 0 13 2
A SenaiteSiteXMLAdapter._get_users() 0 3 1
A SenaiteSiteXMLAdapter._exportNode() 0 18 1
A ATContentXMLAdapter._importNode() 0 17 1
A ATContentXMLAdapter._initWorkflow() 0 20 3
A ATContentXMLAdapter._getObjectNode() 0 13 3
A SenaiteSiteXMLAdapter._extractGroups() 0 12 2
A ATContentXMLAdapter.__init__() 0 2 1
A SenaiteSiteXMLAdapter._extractUsers() 0 13 2
B SenaiteSiteXMLAdapter._initUsers() 0 26 7
A SenaiteSiteXMLAdapter._get_roles_for_principal() 0 7 2
A DXItemXMLAdapter.__init__() 0 2 1
A DXContainerXMLAdapter.__init__() 0 2 1
A ATContentXMLAdapter._initAuditLog() 0 9 3
A ATContentXMLAdapter._exportNode() 0 23 1
B SenaiteSiteXMLAdapter._extractObjects() 0 17 7
A ATContentXMLAdapter._initFields() 0 17 5
A ATContentXMLAdapter._extractAuditLog() 0 6 1
A SenaiteSiteXMLAdapter._get_groups() 0 3 1
B SenaiteSiteXMLAdapter._initGroups() 0 20 6
A ATContentXMLAdapter._extractFields() 0 13 4
A SenaiteSiteXMLAdapter.__init__() 0 2 1
A SenaiteSiteXMLAdapter._get_groups_for_principal() 0 7 2

How to fix   Complexity   

Complexity

Complex classes like senaite.core.exportimport.genericsetup.structure often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
from bika.lims import api
23
from bika.lims import logger
24
from bika.lims.interfaces import IAuditable
25
from bika.lims.interfaces import ISenaiteSiteRoot
26
from DateTime import DateTime
27
from OFS.interfaces import IOrderedContainer
28
from plone.dexterity.interfaces import IDexterityContainer
29
from plone.dexterity.interfaces import IDexterityItem
30
from plone.dexterity.utils import addContentToContainer
31
from plone.memoize.request import cache
32
from Products.Archetypes.interfaces import IBaseObject
33
from Products.CMFPlone.utils import _createObjectByType
34
from Products.CMFPlone.utils import safe_unicode
35
from Products.GenericSetup.interfaces import IBody
36
from Products.GenericSetup.interfaces import INode
37
from Products.GenericSetup.interfaces import ISetupEnviron
38
from Products.GenericSetup.utils import I18NURI
39
from Products.GenericSetup.utils import ObjectManagerHelpers
40
from Products.GenericSetup.utils import XMLAdapterBase
41
from senaite.core.p3compat import cmp
42
from senaite.core.registry import get_registry_record
43
from xml.dom.minidom import parseString
44
from zope.component import adapts
45
from zope.component import getUtility
46
from zope.component import queryMultiAdapter
47
from zope.component.interfaces import IFactory
48
from zope.interface import alsoProvides
49
50
from .config import SITE_ID
51
52
ID_MAP = {}
53
54
55
class SenaiteSiteXMLAdapter(XMLAdapterBase, ObjectManagerHelpers):
56
    """Adapter for the SENAITE root object (portal)
57
    """
58
    adapts(ISenaiteSiteRoot, ISetupEnviron)
59
60
    def __init__(self, context, environ):
61
        super(SenaiteSiteXMLAdapter, self).__init__(context, environ)
62
63
    def _exportNode(self):
64
        """Export the object as a DOM node.
65
        """
66
        node = self._getObjectNode("object")
67
68
        # remember the UID of the item for reference fields
69
        node.setAttribute("uid", "0")
70
71
        # Extract all contained objects
72
        node.appendChild(self._extractObjects())
73
74
        # Extract Groups
75
        node.appendChild(self._extractGroups(self.context))
76
77
        # Extract Users
78
        node.appendChild(self._extractUsers(self.context))
79
80
        return node
81
82
    def _importNode(self, node):
83
        """Import the object from the DOM node.
84
        """
85
        obj_id = str(node.getAttribute("name"))
86
87
        if "acl_users" not in self.context:
88
            return
89
90
        # Add groups and users
91
        self._initGroups(self.context, node)
92
        self._initUsers(self.context, node)
93
94
        self._logger.info("Imported '%r'" % obj_id)
95
96
    def _initGroups(self, context, node):
97
        group_tool = api.get_tool("portal_groups")
98
        for child in node.childNodes:
99
            if child.nodeName != "groups":
100
                continue
101
            for cn in child.childNodes:
102
                if cn.nodeName != "group":
103
                    continue
104
                group_id = cn.firstChild.nodeValue
105
                group = api.user.get_group(group_id)
106
                if not group:
107
                    self._logger.info("Adding group {}".format(group_id))
108
                    roles = cn.getAttribute("roles").split(",")
109
                    group_tool.addGroup(group_id, roles=roles)
110
                    group = group_tool.getGroupById(group_id)
111
112
                # set the group properties
113
                group.setProperties(properties={
114
                    "title": cn.getAttribute("name"),
115
                    "email": cn.getAttribute("email"),
116
                })
117
118
    def _initUsers(self, context, node):
119
        reg_tool = api.get_tool("portal_registration")
120
        for child in node.childNodes:
121
            if child.nodeName != "users":
122
                continue
123
            for cn in child.childNodes:
124
                if cn.nodeName != "user":
125
                    continue
126
                user_id = cn.firstChild.nodeValue
127
                user = api.user.get_user(user_id)
128
129
                if not user: # add new user with password
130
                    self._logger.info("Adding user {}".format(user_id))
131
                    user = reg_tool.addMember(user_id, '12345') 
132
133
                # set the user properties
134
                user.setProperties(properties={
135
                    "fullname": cn.getAttribute("name"),
136
                    "email": cn.getAttribute("email"),
137
                })
138
139
                # add the user to the groups
140
                groups = cn.getAttribute("groups")
141
                if groups:
142
                    group_ids = groups.split(",")
143
                    api.user.add_group(group_ids, user_id)
144
145
    def _get_users(self):
146
        acl_users = api.get_tool("acl_users")
147
        return acl_users.getUsers()
148
149
    def _get_groups(self):
150
        acl_users = api.get_tool("acl_users")
151
        return acl_users.getGroups()
152
153
    def _get_roles_for_principal(self, principal):
154
        """Returs a list of roles for the user/group
155
        """
156
        ignored_roles = ["Authenticated"]
157
        roles = filter(lambda r: r not in ignored_roles,
158
                       principal.getRoles())
159
        return roles
160
161
    def _get_groups_for_principal(self, principal):
162
        """Returs a list of groups for the user/group
163
        """
164
        ignored_groups = ["AuthenticatedUsers"]
165
        groups = filter(lambda r: r not in ignored_groups,
166
                        principal.getGroupIds())
167
        return groups
168
169
    def _extractGroups(self, context):
170
        node = self._doc.createElement("groups")
171
        for group in self._get_groups():
172
            name = group.getGroupName()
173
            roles = self._get_roles_for_principal(group)
174
            child = self._doc.createElement("group")
175
            child.setAttribute("name", safe_unicode(name))
176
            child.setAttribute("roles", ",".join(roles))
177
            text = self._doc.createTextNode(group.getGroupId())
178
            child.appendChild(text)
179
            node.appendChild(child)
180
        return node
181
182
    def _extractUsers(self, context):
183
        node = self._doc.createElement("users")
184
        for user in self._get_users():
185
            name = user.getProperty("fullname")
186
            groups = self._get_groups_for_principal(user)
187
            child = self._doc.createElement("user")
188
            child.setAttribute("name", safe_unicode(name))
189
            child.setAttribute("email", user.getProperty("email"))
190
            child.setAttribute("groups", ",".join(groups))
191
            text = self._doc.createTextNode(user.getId())
192
            child.appendChild(text)
193
            node.appendChild(child)
194
        return node
195
196
    def _extractObjects(self):
197
        fragment = self._doc.createDocumentFragment()
198
        objects = self.context.objectValues()
199
        if not IOrderedContainer.providedBy(self.context):
200
            objects = list(objects)
201
            objects.sort(lambda x, y: cmp(x.getId(), y.getId()))
202
        for obj in objects:
203
            # Check if the object can be exported
204
            if not can_export(obj):
205
                logger.info("Skipping {}".format(repr(obj)))
206
                continue
207
            exporter = queryMultiAdapter((obj, self.environ), INode)
208
            if exporter:
209
                node = exporter.node
210
                if node is not None:
211
                    fragment.appendChild(exporter.node)
212
        return fragment
213
214
215
class ATContentXMLAdapter(SenaiteSiteXMLAdapter):
216
    """AT Content XML Importer/Exporter
217
    """
218
    adapts(IBaseObject, ISetupEnviron)
219
220
    def __init__(self, context, environ):
221
        super(ATContentXMLAdapter, self).__init__(context, environ)
222
223
    def _getObjectNode(self, name, i18n=True):
224
        node = self._doc.createElement(name)
225
        # Attach the UID of the object as well
226
        node.setAttribute("id", api.get_id(self.context))
227
        node.setAttribute("uid", api.get_uid(self.context))
228
        node.setAttribute("name", api.get_id(self.context))
229
        node.setAttribute("meta_type", self.context.meta_type)
230
        node.setAttribute("portal_type", api.get_portal_type(self.context))
231
        i18n_domain = getattr(self.context, "i18n_domain", None)
232
        if i18n and i18n_domain:
233
            node.setAttributeNS(I18NURI, "i18n:domain", i18n_domain)
234
            self._i18n_props = ("title", "description")
235
        return node
236
237
    def _exportNode(self):
238
        """Export the object as a DOM node.
239
        """
240
        node = self._getObjectNode("object")
241
242
        # remember the UID of the item for reference fields
243
        node.setAttribute("uid", api.get_uid(self.context))
244
245
        # remember the WF Status
246
        # TODO: Export the complete Review History
247
        state = api.get_workflow_status_of(self.context)
248
        node.setAttribute("state", state)
249
250
        # Extract AuditLog
251
        node.appendChild(self._extractAuditLog(self.context))
252
253
        # Extract all fields of the current context
254
        node.appendChild(self._extractFields(self.context))
255
256
        # Extract all contained objects
257
        node.appendChild(self._extractObjects())
258
259
        return node
260
261
    def _importNode(self, node):
262
        """Import the object from the DOM node.
263
        """
264
265
        # set workflow state
266
        self._initAuditLog(self.context, node)
267
        self._initWorkflow(self.context, node)
268
        self._initFields(self.context, node)
269
270
        # reindex the object
271
        self.context.reindexObject()
272
273
        # set a new snapshot
274
        # api.snapshot.take_snapshot(self.context)
275
276
        obj_id = str(node.getAttribute("name"))
277
        self._logger.info("Imported '%r'" % obj_id)
278
279
    def _initAuditLog(self, context, node):
280
        for child in node.childNodes:
281
            if child.nodeName == "auditlog":
282
                snapshots = json.loads(child.firstChild.nodeValue)
283
                storage = api.snapshot.get_storage(context)
284
                storage[:] = map(json.dumps, snapshots)[:]
285
                # make sure the object provides `IAuditable`
286
                alsoProvides(context, IAuditable)
287
                return
288
289
    def _initWorkflow(self, context, node):
290
        state = node.getAttribute("state")
291
292
        if not state:
293
            return
294
295
        if state == api.get_workflow_status_of(context):
296
            return
297
298
        wf_state = {
299
            "action": None,
300
            "actor": None,
301
            "comments": "Generic Setup Import",
302
            "review_state": state,
303
            "time": DateTime(),
304
        }
305
306
        wf = api.get_tool("portal_workflow")
307
        wf_id = wf.getChainFor(context)[0]
308
        wf.setStatusOf(wf_id, context, wf_state)
309
310
    def _initFields(self, context, node):
311
        fields = api.get_fields(context)
312
313
        for child in node.childNodes:
314
            # we only handle filed nodes
315
            if child.nodeName != "field":
316
                continue
317
318
            name = child.getAttribute("name")
319
            field = fields.get(name)
320
            if field is None:
321
                self._logger.warning("Unrecognized field '{}'".format(name))
322
                continue
323
324
            importer = queryMultiAdapter((context, field, self.environ), INode)
325
            if importer:
326
                importer.node = child
327
328
    def _extractAuditLog(self, context):
329
        snapshots = api.snapshot.get_snapshots(self.context)
330
        node = self._doc.createElement("auditlog")
331
        child = self._doc.createTextNode(json.dumps(snapshots))
332
        node.appendChild(child)
333
        return node
334
335
    def _extractFields(self, context):
336
        fragment = self._doc.createDocumentFragment()
337
338
        fields = api.get_fields(context)
339
        for name, field in fields.items():
340
            # query the field adapter
341
            exporter = queryMultiAdapter((context, field, self.environ), INode)
342
            if not exporter:
343
                continue
344
            node = exporter.node
345
            if node is not None:
346
                fragment.appendChild(node)
347
        return fragment
348
349
350
class DXContainerXMLAdapter(ATContentXMLAdapter):
351
    """DX Container XML Importer/Exporter
352
    """
353
    adapts(IDexterityContainer, ISetupEnviron)
354
355
    def __init__(self, context, environ):
356
        super(DXContainerXMLAdapter, self).__init__(context, environ)
357
358
359
class DXItemXMLAdapter(ATContentXMLAdapter):
360
    """DX Item XML Importer/Exporter
361
    """
362
    adapts(IDexterityItem, ISetupEnviron)
363
364
    def __init__(self, context, environ):
365
        super(DXItemXMLAdapter, self).__init__(context, environ)
366
367
368
def create_content_slugs(parent, parent_path, context):
369
    """Helper function to create initial content slugs
370
    """
371
    logger.info("create_content_slugs: parent={} parent_path={}".format(
372
        repr(parent), parent_path))
373
374
    path = "%s%s" % (parent_path, get_id(parent))
375
    filename = "%s.xml" % (path)
376
    xml = context.readDataFile(filename)
377
378
    if xml is None:
379
        logger.error("File not found: '{}'".format(filename))
380
        return
381
382
    # parse the XML data
383
    node = parseString(xml)
384
385
    if node.nodeName == "#document":
386
        node = node.firstChild
387
388
    # read the node attributes
389
    name = node.getAttribute("name")
390
    uid = node.getAttribute("uid")
391
    logger.info("::: Processing '{}' (UID {}) in path '{}' :::"
392
                .format(name, uid, path))
393
394
    def is_object_node(n):
395
        return getattr(n, "nodeName", "") == "object"
396
397
    def get_child_nodes(n):
398
        return getattr(n, "childNodes", [])
399
400
    for child in get_child_nodes(node):
401
        # only process `<object ../>` nodes
402
        if not is_object_node(child):
403
            continue
404
        # extract node attributes (see `_exportNode` method)
405
        child_id = child.getAttribute("name")
406
        child_uid = child.getAttribute("uid")
407
        portal_type = child.getAttribute("portal_type")
408
        # get or create object
409
        obj = create_or_get(parent, child_id, child_uid, portal_type)
410
        # handle vanished objects
411
        if obj is None:
412
            logger.warn("Skipping object creation for '{}'".format(path))
413
            continue
414
        # get the id of the new object
415
        obj_id = api.get_id(obj)
416
        # track new ID -> old ID
417
        ID_MAP[obj_id] = child_id
418
        # recursively create contents
419
        create_content_slugs(obj, path + "/", context)
420
421
422
def create_or_get(parent, id, uid, portal_type):
423
    """Create or get the object
424
    """
425
    # return first level objects directly
426
    if api.is_portal(parent):
427
        return parent.get(id)
428
    elif api.get_bika_setup() == parent:
429
        return parent.get(id)
430
    elif api.get_senaite_setup() == parent:
431
        return parent.get(id)
432
433
    # query object by UID
434
    query = {
435
        "UID": uid,
436
        "portal_type": portal_type,
437
        "path": {
438
            "query": api.get_path(parent),
439
        }
440
    }
441
    results = api.search(query, "uid_catalog")
442
    if results:
443
        return api.get_object(results[0])
444
445
    # create object slug
446
    obj = None
447
    # get the fti
448
    types_tool = api.get_tool("portal_types")
449
    fti = types_tool.getTypeInfo(portal_type)
450
    # removed
451
    if not fti:
452
        return None
453
    # old style factory
454
    if fti.product:
455
        # Create AT Content Slug (we take the UID as ID to avoid clashes)
456
        obj = _createObjectByType(portal_type, parent, uid)
457
        # set the old UID to maintain references
458
        obj._setUID(uid)
459
        # IMPORTANT: this will generate a new ID by the ID Server config
460
        obj.processForm()
461
    else:
462
        # Create DX Content Slug
463
        factory = getUtility(IFactory, fti.factory)
464
        tmp_id = str(uid)
465
        obj = factory(tmp_id)
466
        if hasattr(obj, "_setPortalTypeName"):
467
            obj._setPortalTypeName(fti.getId())
468
        # set the old UID to maintain references
469
        setattr(obj, "_plone.uuid", uid)
470
        # IMPORTANT: this will generate a new ID by the ID Server config
471
        obj = addContentToContainer(parent, obj, checkConstraints=False)
472
473
    return obj
474
475
476
@cache(get_key=lambda *args: 'can-export-%s' % args[1])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
477
def can_export_type(portal_type, request):
478
    """Returns whether objects from this type can be exported
479
    """
480
    name = "generic_setup_skip_export_types"
481
    skip = get_registry_record(name, default=[])
482
    return portal_type not in skip
483
484
485
def can_export(obj):
486
    """Decides if the object can be exported or not
487
    """
488
    if not api.is_object(obj):
489
        return False
490
    portal_type = api.get_portal_type(obj)
491
    request = api.get_request()
492
    return can_export_type(portal_type, request)
493
494
495
def can_import(obj):
496
    """Decides if the object can be imported or not
497
    """
498
    if not api.is_object(obj):
499
        return False
500
    return True
501
502
503
def get_id(obj):
504
    if api.is_portal(obj):
505
        return SITE_ID
506
    oid = api.get_id(obj)
507
    # resolve id from mapping
508
    rid = ID_MAP.get(oid, oid)
509
    return rid.replace(" ", "_")
510
511
512
def exportObjects(obj, parent_path, context):
513
    """ Export subobjects recursively.
514
    """
515
516
    if not can_export(obj):
517
        logger.info("Skipping export of {}".format(repr(obj)))
518
        return
519
520
    if api.is_portal(obj):
521
        # explicitly instantiate the exporter to avoid adapter clash of
522
        # Products.CMFCore.exportimport.properties.PropertiesXMLAdapter
523
        exporter = SenaiteSiteXMLAdapter(obj, context)
524
    else:
525
        exporter = queryMultiAdapter((obj, context), IBody)
526
527
    path = "%s%s" % (parent_path, get_id(obj))
528
    if exporter:
529
        if exporter.name:
530
            path = "%s%s" % (parent_path, exporter.name)
531
        filename = "%s%s" % (path, exporter.suffix)
532
        body = exporter.body
533
        if body is not None:
534
            context.writeDataFile(filename, body, exporter.mime_type)
535
    else:
536
        raise ValueError("No exporter found for object: %r" % obj)
537
538
    if getattr(obj, "objectValues", False):
539
        for sub in obj.objectValues():
540
            exportObjects(sub, path + "/", context)
541
542
543
def importObjects(obj, parent_path, context):
544
    """ Import subobjects recursively.
545
    """
546
547
    if not can_import(obj):
548
        logger.info("Skipping import of {}".format(repr(obj)))
549
        return
550
551
    if api.is_portal(obj):
552
        # explicitly instantiate the importer to avoid adapter clash of
553
        # Products.CMFCore.exportimport.properties.PropertiesXMLAdapter
554
        importer = SenaiteSiteXMLAdapter(obj, context)
555
    else:
556
        importer = queryMultiAdapter((obj, context), IBody)
557
558
    path = "%s%s" % (parent_path, get_id(obj))
559
    __traceback_info__ = path
560
    if importer:
561
        if importer.name:
562
            path = "%s%s" % (parent_path, importer.name)
563
        filename = "%s%s" % (path, importer.suffix)
564
        body = context.readDataFile(filename)
565
        if body is not None:
566
            importer.filename = filename  # for error reporting
567
            importer.body = body
568
569
    if getattr(obj, "objectValues", False):
570
        for sub in obj.objectValues():
571
            importObjects(sub, path + "/", context)
572
573
574
def export_xml(context):
575
    portal = context.getSite()
576
    exportObjects(portal, "", context)
577
578
579
def import_xml(context):
580
    portal = context.getSite()
581
582
    qi = api.get_tool("portal_quickinstaller")
583
    installed = qi.isProductInstalled("senaite.core")
584
    request = api.get_request()
585
586
    # tests call it w/o request
587
    if request and not installed:
588
        logger.debug("Nothing to import.")
589
        return
590
591
    if not context.readDataFile("senaite.xml"):
592
        logger.debug("Nothing to import.")
593
        return
594
595
    # create content slugs for UID references
596
    create_content_slugs(portal, "", context)
597
598
    # import objects
599
    importObjects(portal, "", context)
600