Passed
Push — 2.x ( ed0eb2...58e5cd )
by Jordi
07:43 queued 02:42
created

senaite.core.exportimport.genericsetup.adapters   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 469
Duplicated Lines 5.12 %

Importance

Changes 0
Metric Value
wmc 60
eloc 271
dl 24
loc 469
rs 3.6
c 0
b 0
f 0

33 Methods

Rating   Name   Duplication   Size   Complexity  
A ATFileFieldNodeAdapter.get_content_type() 0 4 1
A DXTupleFieldNodeAdapter.set_field_value() 0 6 2
A ATFileFieldNodeAdapter.set_node_value() 0 5 1
A ATFieldNodeAdapter.get_field_value() 0 4 1
A ATReferenceFieldNodeAdapter.get_json_value() 0 11 3
A ATDateTimeFieldNodeAdapter.get_json_value() 0 7 2
A ATFieldNodeAdapter.set_node_value() 0 6 2
A ATFieldNodeAdapter.parse_json_value() 0 2 1
A DXNamedFileFieldNodeAdapter.get_content_type() 0 4 1
A ATFieldNodeAdapter._importNode() 0 12 4
A DXDateTimeFieldNodeAdapter.parse_json_value() 0 6 2
A ATRichTextFieldNodeAdapter.get_field_value() 12 12 3
A DXRichTextFieldNodeAdapter.get_field_value() 12 12 3
A ATFileFieldNodeAdapter.get_file_data() 0 4 1
A ATFieldNodeAdapter.can_write() 0 7 2
A DXChoiceFieldNodeAdapter.set_field_value() 0 8 2
A DXNamedFileFieldNodeAdapter.set_node_value() 0 6 1
A ATFieldNodeAdapter.set_field_value() 0 9 2
A ATFileFieldNodeAdapter.get_archive_path() 0 7 1
A DXDateTimeFieldNodeAdapter.get_json_value() 0 7 2
A ATFieldNodeAdapter._exportNode() 0 5 1
A DXFieldNodeAdapter.set_field_value() 0 10 2
A ATFieldNodeAdapter.__init__() 0 3 1
A DXFieldNodeAdapter.set_node_value() 0 3 1
A ATFieldNodeAdapter.get_json_value() 0 12 2
A ATDateTimeFieldNodeAdapter.parse_json_value() 0 4 2
A ATFieldNodeAdapter.get_node_value() 0 8 1
A DXNamedFileFieldNodeAdapter.set_field_value() 0 13 2
A ATFileFieldNodeAdapter.get_json_value() 0 15 4
A DXFieldNodeAdapter.can_write() 0 11 3
A DXChoiceFieldNodeAdapter.sanitize_language_value() 0 10 2
A DXFieldNodeAdapter.get_field_value() 0 5 1
A DXFieldNodeAdapter.__init__() 0 3 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like senaite.core.exportimport.genericsetup.adapters often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
from datetime import datetime
23
from mimetypes import guess_type
24
25
import six
26
27
from bika.lims import api
28
from bika.lims import logger
29
from bika.lims.interfaces.field import IUIDReferenceField
30
from DateTime import DateTime
31
from plone.app.blob.interfaces import IBlobField
32
from plone.app.dexterity.behaviors.metadata import default_language
33
from plone.app.textfield.interfaces import IRichText
34
from plone.dexterity.interfaces import IDexterityContent
35
from plone.namedfile.interfaces import INamedField
36
from Products.Archetypes.interfaces import IBaseObject
37
from Products.Archetypes.interfaces import IDateTimeField
38
from Products.Archetypes.interfaces import IField
39
from Products.Archetypes.interfaces import IFileField
40
from Products.Archetypes.interfaces import IReferenceField
41
from Products.Archetypes.interfaces import ITextField
42
from Products.CMFPlone.utils import safe_unicode
43
from Products.GenericSetup.interfaces import ISetupEnviron
44
from Products.GenericSetup.utils import NodeAdapterBase
45
from senaite.core.api import dtime
46
from senaite.core.schema.interfaces import IDataGridField
47
from senaite.core.schema.interfaces import \
48
    IUIDReferenceField as IUIDReferenceFieldDX
49
from z3c.form.interfaces import IDataManager
50
from zope.component import adapts
51
from zope.component import getMultiAdapter
52
from zope.interface import implements
53
from zope.schema.interfaces import ConstraintNotSatisfied
54
from zope.schema.interfaces import IChoice
55
from zope.schema.interfaces import IDatetime
56
from zope.schema.interfaces import IField as ISchemaField
57
from zope.schema.interfaces import IText
58
from zope.schema.interfaces import ITextLine
59
from zope.schema.interfaces import ITuple
60
61
from .config import SITE_ID
62
from .interfaces import IFieldNode
63
from .interfaces import IRecordField
64
65
SKIP_FIELDS = [
66
    "id",
67
    "rights",
68
]
69
70
71
class ATFieldNodeAdapter(NodeAdapterBase):
72
    """Node im- and exporter for AT Fields.
73
    """
74
    implements(IFieldNode)
75
    adapts(IBaseObject, IField, ISetupEnviron)
76
77
    el = "field"
78
79
    def __init__(self, context, field, environ):
80
        super(ATFieldNodeAdapter, self).__init__(context, environ)
81
        self.field = field
82
83
    def can_write(self):
84
        """Checks if the field is writable
85
        """
86
        readonly = getattr(self.field, "readonly", False)
87
        if readonly:
88
            return False
89
        return True
90
91
    def set_field_value(self, value, **kw):
92
        """Set the field value
93
        """
94
        # logger.info("Set: {} -> {}".format(self.field.getName(), value))
95
        if not self.can_write():
96
            logger.info("Skipping readonly field %s.%s" % (
97
                self.context.__class__.__name__, self.field.__name__))
98
            return
99
        return self.field.set(self.context, value, **kw)
100
101
    def get_field_value(self):
102
        """Get the field value
103
        """
104
        return self.field.get(self.context)
105
106
    def get_json_value(self):
107
        """JSON converted field value
108
        """
109
        value = self.get_field_value()
110
        try:
111
            # Always handle the value as unicode
112
            return json.dumps(safe_unicode(value))
113
        except TypeError:
114
            logger.error(
115
                "ParseError: '{}.{} ('{}')' is not JSON serializable!".format(
116
                    self.context.getId(), self.field.getName(), repr(value)))
117
            return ""
118
119
    def parse_json_value(self, value):
120
        return json.loads(value)
121
122
    def get_node_value(self, value):
123
        """Convert the field value to a XML node
124
        """
125
        node = self._doc.createElement(self.el)
126
        node.setAttribute("name", self.field.getName())
127
        child = self._doc.createTextNode(value)
128
        node.appendChild(child)
129
        return node
130
131
    def set_node_value(self, node):
132
        value = self.parse_json_value(node.nodeValue)
133
        # encode unicodes to UTF8
134
        if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
135
            value = value.encode("utf8")
136
        self.set_field_value(value)
137
138
    def _exportNode(self):
139
        """Export the object as a DOM node.
140
        """
141
        value = self.get_json_value()
142
        return self.get_node_value(value)
143
144
    def _importNode(self, node):
145
        """Import the object from the DOM node.
146
        """
147
        if self.field.getName() in SKIP_FIELDS:
148
            return
149
        child = node.firstChild
150
        if child is None:
151
            return
152
        if child.nodeName != "#text":
153
            logger.warning("No textnode found!")
154
            return False
155
        self.set_node_value(child)
156
157
    node = property(_exportNode, _importNode)
158
159
160
class DXFieldNodeAdapter(ATFieldNodeAdapter):
161
    """Node im- and exporter for DX Fields.
162
    """
163
    implements(IFieldNode)
164
    adapts(IDexterityContent, ISchemaField, ISetupEnviron)
165
166
    def __init__(self, context, field, environ):
167
        super(DXFieldNodeAdapter, self).__init__(context, field, environ)
168
        self.field = field
169
170
    def can_write(self):
171
        """Checks if the field is writable
172
        """
173
        readonly = getattr(self.field, "readonly", False)
174
        if readonly:
175
            return False
176
        dm = getMultiAdapter((self.context, self.field), IDataManager)
177
        writable = dm.canWrite()
178
        if not writable:
179
            return False
180
        return True
181
182
    def set_node_value(self, node):
183
        value = self.parse_json_value(node.nodeValue)
184
        self.set_field_value(value)
185
186
    def set_field_value(self, value, **kw):
187
        """Set the field value
188
        """
189
        if not self.can_write():
190
            logger.info("Skipping readonly field %s.%s" % (
191
                self.context.__class__.__name__, self.field.__name__))
192
            return
193
        # logger.info("Set: {} -> {}".format(self.field.getName(), value))
194
        dm = getMultiAdapter((self.context, self.field), IDataManager)
195
        dm.set(value, **kw)
196
197
    def get_field_value(self):
198
        """Get the field value
199
        """
200
        dm = getMultiAdapter((self.context, self.field), IDataManager)
201
        return dm.get()
202
203
204
class ATTextFieldNodeAdapter(ATFieldNodeAdapter):
205
    """Import/Export Text
206
    """
207
    adapts(IBaseObject, ITextField, ISetupEnviron)
208
209
210
class ATFileFieldNodeAdapter(ATFieldNodeAdapter):
211
    """Import/Export Files/Images
212
    """
213
    adapts(IBaseObject, IFileField, ISetupEnviron)
214
215
    def set_node_value(self, node):
216
        filename = node.nodeValue
217
        filepath = "/".join([self.get_archive_path(), filename])
218
        data = self.get_file_data(filepath)
219
        self.set_field_value(data, filename=filename)
220
221
    def get_archive_path(self):
222
        """Get the unified archive path
223
        """
224
        site = self.environ.getSite()
225
        site_path = api.get_path(site)
226
        obj_path = api.get_path(self.context)
227
        return obj_path.replace(site_path, SITE_ID, 1)
228
229
    def get_file_data(self, path):
230
        """Return the file data from the archive path
231
        """
232
        return self.environ.readDataFile(path)
233
234
    def get_content_type(self, content, default="application/octet-stream"):
235
        """Returns the content type of the object
236
        """
237
        return getattr(content, "content_type", default)
238
239
    def get_json_value(self):
240
        """Returns the filename
241
        """
242
        value = self.get_field_value()
243
244
        if isinstance(value, six.string_types):
245
            return value
246
247
        filename = safe_unicode(value.filename) or ""
248
        data = value.data
249
        if filename and data:
250
            path = self.get_archive_path()
251
            content_type = self.get_content_type(value)
252
            self.environ.writeDataFile(filename, str(data), content_type, path)
253
        return filename
254
255
256
class ATBlobFileFieldNodeAdapter(ATFileFieldNodeAdapter):
257
    """Import/Export AT Files/Images
258
    """
259
    adapts(IBaseObject, IBlobField, ISetupEnviron)
260
261
262
class DXNamedFileFieldNodeAdapter(ATBlobFileFieldNodeAdapter):
263
    """Import/Export DX Files/Images
264
    """
265
    adapts(IDexterityContent, INamedField, ISetupEnviron)
266
267
    def get_content_type(self, content, default="application/octet-stream"):
268
        """Returns the content type of the object
269
        """
270
        return getattr(content, "contentType", default)
271
272
    def set_node_value(self, node):
273
        filename = node.nodeValue
274
        filepath = "/".join([self.get_archive_path(), filename])
275
        data = self.get_file_data(filepath)
276
        mime_type, encoding = guess_type(filename)
277
        self.set_field_value(data, filename=filename, content_type=mime_type)
278
279
    def set_field_value(self, value, **kw):
280
        """Set the field value
281
        """
282
        # logger.info("Set: {} -> {}".format(self.field.getName(), value))
283
        data = value
284
        if not data:
285
            logger.error("Can not set empty file contents")
286
            return
287
        filename = kw.get("filename", "")
288
        contentType = kw.get("mimetype") or kw.get("content_type")
289
        value = self.field._type(
290
            data=data, contentType=contentType, filename=filename)
291
        self.field.set(self.context, value)
292
293
294
class ATDateTimeFieldNodeAdapter(ATFieldNodeAdapter):
295
    """Import/Export Date Fields
296
    """
297
    adapts(IBaseObject, IDateTimeField, ISetupEnviron)
298
299
    def get_json_value(self):
300
        """Returns the date as ISO string
301
        """
302
        value = self.field.get(self.context)
303
        if not isinstance(value, DateTime):
304
            return ""
305
        return value.ISO()
306
307
    def parse_json_value(self, value):
308
        if not value:
309
            return None
310
        return DateTime(value)
311
312
313
class DXDateTimeFieldNodeAdapter(ATFieldNodeAdapter):
314
    """Import/Export Date Fields
315
    """
316
    adapts(IDexterityContent, IDatetime, ISetupEnviron)
317
318
    def get_json_value(self):
319
        """Returns the date as ISO string
320
        """
321
        value = self.field.get(self.context)
322
        if not isinstance(value, datetime):
323
            return ""
324
        return dtime.to_iso_format(value)
325
326
    def parse_json_value(self, value):
327
        if not value:
328
            return None
329
        # Avoid `UnknownTimeZoneError` by using the date API for conversion
330
        # also see https://github.com/senaite/senaite.patient/pull/29
331
        return dtime.to_dt(value)
332
333
334
class ATReferenceFieldNodeAdapter(ATFieldNodeAdapter):
335
    """Import/Export UID Reference Fields
336
    """
337
    adapts(IBaseObject, IReferenceField, ISetupEnviron)
338
339
    def get_json_value(self):
340
        """Convert referenced objects to UIDs
341
        """
342
        value = self.field.get(self.context)
343
        if api.is_object(value):
344
            value = api.get_uid(value)
345
        elif isinstance(value, list):
346
            value = map(api.get_uid, value)
347
        else:
348
            value = ""
349
        return json.dumps(value)
350
351
352
class ATUIDReferenceFieldNodeAdapter(ATReferenceFieldNodeAdapter):
353
    """Import/Export UID Reference Fields
354
    """
355
    adapts(IBaseObject, IUIDReferenceField, ISetupEnviron)
356
357
358
class ATRecordFieldNodeAdapter(ATFieldNodeAdapter):
359
    """Import/Export Records Fields
360
    """
361
    adapts(IBaseObject, IRecordField, ISetupEnviron)
362
363
364
class ATRichTextFieldNodeAdapter(ATFieldNodeAdapter):
365
    """Node im- and exporter for AT RichText fields.
366
    """
367
    implements(IFieldNode)
368
    adapts(IBaseObject, IRichText, ISetupEnviron)
369
370 View Code Duplication
    def get_field_value(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
371
        """Get the field value
372
        """
373
        value = self.field.get(self.context)
374
        if not value:
375
            return ""
376
        try:
377
            return value.raw
378
        except AttributeError as e:
379
            logger.info("Imported value has no Attribute 'raw' {}"
380
                        .format(str(e)))
381
            return value
382
383
384
class DXTupleFieldNodeAdapter(DXFieldNodeAdapter):
385
    """Node im- and exporter for DX Tuple fields.
386
    """
387
    implements(IFieldNode)
388
    adapts(IDexterityContent, ITuple, ISetupEnviron)
389
390
    def set_field_value(self, value, **kw):
391
        """Set the field value
392
        """
393
        if isinstance(value, list):
394
            value = tuple(value)
395
        super(DXTupleFieldNodeAdapter, self).set_field_value(value, **kw)
396
397
398
class DXTextLineFieldNodeAdapter(DXFieldNodeAdapter):
399
    """Node im- and exporter for DX TextLine fields.
400
    """
401
    implements(IFieldNode)
402
    adapts(IDexterityContent, ITextLine, ISetupEnviron)
403
404
405
class DXTextFieldNodeAdapter(DXFieldNodeAdapter):
406
    """Node im- and exporter for DX Text fields.
407
    """
408
    implements(IFieldNode)
409
    adapts(IDexterityContent, IText, ISetupEnviron)
410
411
412
class DXRichTextFieldNodeAdapter(DXFieldNodeAdapter):
413
    """Node im- and exporter for DX RichText fields.
414
    """
415
    implements(IFieldNode)
416
    adapts(IDexterityContent, IRichText, ISetupEnviron)
417
418 View Code Duplication
    def get_field_value(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
419
        """Get the field value
420
        """
421
        value = self.field.get(self.context)
422
        if not value:
423
            return ""
424
        try:
425
            return value.raw
426
        except AttributeError as e:
427
            logger.info("Imported value has no Attribute 'raw' {}"
428
                        .format(str(e)))
429
            return value
430
431
432
class DXReferenceFieldNodeAdapter(ATReferenceFieldNodeAdapter):
433
    """Import/Export DX UID Reference Fields
434
    """
435
    adapts(IDexterityContent, IUIDReferenceFieldDX, ISetupEnviron)
436
437
438
class DXDataGridFieldNodeAdapter(ATRecordFieldNodeAdapter):
439
    """Import/Export DataGrid Fields
440
    """
441
    adapts(IDexterityContent, IDataGridField, ISetupEnviron)
442
443
444
class DXChoiceFieldNodeAdapter(DXFieldNodeAdapter):
445
    """Import/Export Choice Fields
446
    """
447
    implements(IFieldNode)
448
    adapts(IDexterityContent, IChoice, ISetupEnviron)
449
450
    def set_field_value(self, value, **kw):
451
        """Set the field value
452
        """
453
        # Raises an ConstraintNotSatisfied error when the language is not
454
        # in the vocabulary of available languages
455
        if self.field.getName() == "language":
456
            value = self.sanitize_language_value(value)
457
        super(DXChoiceFieldNodeAdapter, self).set_field_value(value, **kw)
458
459
    def sanitize_language_value(self, value):
460
        """Ensure the given language is in the language vocabulary
461
462
        This avoids that the validator fails
463
        """
464
        try:
465
            self.field._validate(value)
466
            return value
467
        except ConstraintNotSatisfied:
468
            return default_language(self.context)
469