Passed
Push — 2.x ( 112763...d2822b )
by Jordi
05:43
created

senaite.core.exportimport.genericsetup.adapters   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 439
Duplicated Lines 5.47 %

Importance

Changes 0
Metric Value
wmc 56
eloc 255
dl 24
loc 439
rs 5.5199
c 0
b 0
f 0

31 Methods

Rating   Name   Duplication   Size   Complexity  
A ATFileFieldNodeAdapter.get_content_type() 0 4 1
A DXTupleFieldNodeAdapter.set_field_value() 0 6 2
A ATFileFieldNodeAdapter.set_node_value() 0 5 1
A ATFieldNodeAdapter.get_field_value() 0 4 1
A ATReferenceFieldNodeAdapter.get_json_value() 0 11 3
A ATDateTimeFieldNodeAdapter.get_json_value() 0 7 2
A ATFieldNodeAdapter.set_node_value() 0 6 2
A ATFieldNodeAdapter.parse_json_value() 0 2 1
A DXNamedFileFieldNodeAdapter.get_content_type() 0 4 1
A ATFieldNodeAdapter._importNode() 0 12 4
A DXDateTimeFieldNodeAdapter.parse_json_value() 0 6 2
A ATRichTextFieldNodeAdapter.get_field_value() 12 12 3
A DXRichTextFieldNodeAdapter.get_field_value() 12 12 3
A ATFileFieldNodeAdapter.get_file_data() 0 4 1
A ATFieldNodeAdapter.can_write() 0 7 2
A DXNamedFileFieldNodeAdapter.set_node_value() 0 6 1
A ATFieldNodeAdapter.set_field_value() 0 9 2
A ATFileFieldNodeAdapter.get_archive_path() 0 7 1
A DXDateTimeFieldNodeAdapter.get_json_value() 0 7 2
A ATFieldNodeAdapter._exportNode() 0 5 1
A DXFieldNodeAdapter.set_field_value() 0 10 2
A ATFieldNodeAdapter.__init__() 0 3 1
A DXFieldNodeAdapter.set_node_value() 0 3 1
A ATFieldNodeAdapter.get_json_value() 0 12 2
A ATDateTimeFieldNodeAdapter.parse_json_value() 0 4 2
A ATFieldNodeAdapter.get_node_value() 0 8 1
A DXNamedFileFieldNodeAdapter.set_field_value() 0 13 2
A ATFileFieldNodeAdapter.get_json_value() 0 15 4
A DXFieldNodeAdapter.can_write() 0 11 3
A DXFieldNodeAdapter.get_field_value() 0 5 1
A DXFieldNodeAdapter.__init__() 0 3 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like senaite.core.exportimport.genericsetup.adapters often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
from datetime import datetime
23
from mimetypes import guess_type
24
25
import six
26
27
from bika.lims import api
28
from bika.lims import logger
29
from bika.lims.interfaces.field import IUIDReferenceField
30
from DateTime import DateTime
31
from plone.app.blob.interfaces import IBlobField
32
from plone.app.textfield.interfaces import IRichText
33
from plone.dexterity.interfaces import IDexterityContent
34
from plone.namedfile.interfaces import INamedField
35
from Products.Archetypes.interfaces import IBaseObject
36
from Products.Archetypes.interfaces import IDateTimeField
37
from Products.Archetypes.interfaces import IField
38
from Products.Archetypes.interfaces import IFileField
39
from Products.Archetypes.interfaces import IReferenceField
40
from Products.Archetypes.interfaces import ITextField
41
from Products.CMFPlone.utils import safe_unicode
42
from Products.GenericSetup.interfaces import ISetupEnviron
43
from Products.GenericSetup.utils import NodeAdapterBase
44
from senaite.core.api import dtime
45
from senaite.core.schema.interfaces import IDataGridField
46
from senaite.core.schema.interfaces import \
47
    IUIDReferenceField as IUIDReferenceFieldDX
48
from z3c.form.interfaces import IDataManager
49
from zope.component import adapts
50
from zope.component import getMultiAdapter
51
from zope.interface import implements
52
from zope.schema.interfaces import IDatetime
53
from zope.schema.interfaces import IField as ISchemaField
54
from zope.schema.interfaces import IText
55
from zope.schema.interfaces import ITextLine
56
from zope.schema.interfaces import ITuple
57
58
from .config import SITE_ID
59
from .interfaces import IFieldNode
60
from .interfaces import IRecordField
61
62
SKIP_FIELDS = [
63
    "id",
64
    "rights",
65
]
66
67
68
class ATFieldNodeAdapter(NodeAdapterBase):
69
    """Node im- and exporter for AT Fields.
70
    """
71
    implements(IFieldNode)
72
    adapts(IBaseObject, IField, ISetupEnviron)
73
74
    el = "field"
75
76
    def __init__(self, context, field, environ):
77
        super(ATFieldNodeAdapter, self).__init__(context, environ)
78
        self.field = field
79
80
    def can_write(self):
81
        """Checks if the field is writable
82
        """
83
        readonly = getattr(self.field, "readonly", False)
84
        if readonly:
85
            return False
86
        return True
87
88
    def set_field_value(self, value, **kw):
89
        """Set the field value
90
        """
91
        # logger.info("Set: {} -> {}".format(self.field.getName(), value))
92
        if not self.can_write():
93
            logger.info("Skipping readonly field %s.%s" % (
94
                self.context.__class__.__name__, self.field.__name__))
95
            return
96
        return self.field.set(self.context, value, **kw)
97
98
    def get_field_value(self):
99
        """Get the field value
100
        """
101
        return self.field.get(self.context)
102
103
    def get_json_value(self):
104
        """JSON converted field value
105
        """
106
        value = self.get_field_value()
107
        try:
108
            # Always handle the value as unicode
109
            return json.dumps(safe_unicode(value))
110
        except TypeError:
111
            logger.error(
112
                "ParseError: '{}.{} ('{}')' is not JSON serializable!".format(
113
                    self.context.getId(), self.field.getName(), repr(value)))
114
            return ""
115
116
    def parse_json_value(self, value):
117
        return json.loads(value)
118
119
    def get_node_value(self, value):
120
        """Convert the field value to a XML node
121
        """
122
        node = self._doc.createElement(self.el)
123
        node.setAttribute("name", self.field.getName())
124
        child = self._doc.createTextNode(value)
125
        node.appendChild(child)
126
        return node
127
128
    def set_node_value(self, node):
129
        value = self.parse_json_value(node.nodeValue)
130
        # encode unicodes to UTF8
131
        if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
132
            value = value.encode("utf8")
133
        self.set_field_value(value)
134
135
    def _exportNode(self):
136
        """Export the object as a DOM node.
137
        """
138
        value = self.get_json_value()
139
        return self.get_node_value(value)
140
141
    def _importNode(self, node):
142
        """Import the object from the DOM node.
143
        """
144
        if self.field.getName() in SKIP_FIELDS:
145
            return
146
        child = node.firstChild
147
        if child is None:
148
            return
149
        if child.nodeName != "#text":
150
            logger.warning("No textnode found!")
151
            return False
152
        self.set_node_value(child)
153
154
    node = property(_exportNode, _importNode)
155
156
157
class DXFieldNodeAdapter(ATFieldNodeAdapter):
158
    """Node im- and exporter for DX Fields.
159
    """
160
    implements(IFieldNode)
161
    adapts(IDexterityContent, ISchemaField, ISetupEnviron)
162
163
    def __init__(self, context, field, environ):
164
        super(DXFieldNodeAdapter, self).__init__(context, field, environ)
165
        self.field = field
166
167
    def can_write(self):
168
        """Checks if the field is writable
169
        """
170
        readonly = getattr(self.field, "readonly", False)
171
        if readonly:
172
            return False
173
        dm = getMultiAdapter((self.context, self.field), IDataManager)
174
        writable = dm.canWrite()
175
        if not writable:
176
            return False
177
        return True
178
179
    def set_node_value(self, node):
180
        value = self.parse_json_value(node.nodeValue)
181
        self.set_field_value(value)
182
183
    def set_field_value(self, value, **kw):
184
        """Set the field value
185
        """
186
        if not self.can_write():
187
            logger.info("Skipping readonly field %s.%s" % (
188
                self.context.__class__.__name__, self.field.__name__))
189
            return
190
        # logger.info("Set: {} -> {}".format(self.field.getName(), value))
191
        dm = getMultiAdapter((self.context, self.field), IDataManager)
192
        dm.set(value, **kw)
193
194
    def get_field_value(self):
195
        """Get the field value
196
        """
197
        dm = getMultiAdapter((self.context, self.field), IDataManager)
198
        return dm.get()
199
200
201
class ATTextFieldNodeAdapter(ATFieldNodeAdapter):
202
    """Import/Export Text
203
    """
204
    adapts(IBaseObject, ITextField, ISetupEnviron)
205
206
207
class ATFileFieldNodeAdapter(ATFieldNodeAdapter):
208
    """Import/Export Files/Images
209
    """
210
    adapts(IBaseObject, IFileField, ISetupEnviron)
211
212
    def set_node_value(self, node):
213
        filename = node.nodeValue
214
        filepath = "/".join([self.get_archive_path(), filename])
215
        data = self.get_file_data(filepath)
216
        self.set_field_value(data, filename=filename)
217
218
    def get_archive_path(self):
219
        """Get the unified archive path
220
        """
221
        site = self.environ.getSite()
222
        site_path = api.get_path(site)
223
        obj_path = api.get_path(self.context)
224
        return obj_path.replace(site_path, SITE_ID, 1)
225
226
    def get_file_data(self, path):
227
        """Return the file data from the archive path
228
        """
229
        return self.environ.readDataFile(path)
230
231
    def get_content_type(self, content, default="application/octet-stream"):
232
        """Returns the content type of the object
233
        """
234
        return getattr(content, "content_type", default)
235
236
    def get_json_value(self):
237
        """Returns the filename
238
        """
239
        value = self.get_field_value()
240
241
        if isinstance(value, six.string_types):
242
            return value
243
244
        filename = safe_unicode(value.filename) or ""
245
        data = value.data
246
        if filename and data:
247
            path = self.get_archive_path()
248
            content_type = self.get_content_type(value)
249
            self.environ.writeDataFile(filename, str(data), content_type, path)
250
        return filename
251
252
253
class ATBlobFileFieldNodeAdapter(ATFileFieldNodeAdapter):
254
    """Import/Export AT Files/Images
255
    """
256
    adapts(IBaseObject, IBlobField, ISetupEnviron)
257
258
259
class DXNamedFileFieldNodeAdapter(ATBlobFileFieldNodeAdapter):
260
    """Import/Export DX Files/Images
261
    """
262
    adapts(IDexterityContent, INamedField, ISetupEnviron)
263
264
    def get_content_type(self, content, default="application/octet-stream"):
265
        """Returns the content type of the object
266
        """
267
        return getattr(content, "contentType", default)
268
269
    def set_node_value(self, node):
270
        filename = node.nodeValue
271
        filepath = "/".join([self.get_archive_path(), filename])
272
        data = self.get_file_data(filepath)
273
        mime_type, encoding = guess_type(filename)
274
        self.set_field_value(data, filename=filename, content_type=mime_type)
275
276
    def set_field_value(self, value, **kw):
277
        """Set the field value
278
        """
279
        # logger.info("Set: {} -> {}".format(self.field.getName(), value))
280
        data = value
281
        if not data:
282
            logger.error("Can not set empty file contents")
283
            return
284
        filename = kw.get("filename", "")
285
        contentType = kw.get("mimetype") or kw.get("content_type")
286
        value = self.field._type(
287
            data=data, contentType=contentType, filename=filename)
288
        self.field.set(self.context, value)
289
290
291
class ATDateTimeFieldNodeAdapter(ATFieldNodeAdapter):
292
    """Import/Export Date Fields
293
    """
294
    adapts(IBaseObject, IDateTimeField, ISetupEnviron)
295
296
    def get_json_value(self):
297
        """Returns the date as ISO string
298
        """
299
        value = self.field.get(self.context)
300
        if not isinstance(value, DateTime):
301
            return ""
302
        return value.ISO()
303
304
    def parse_json_value(self, value):
305
        if not value:
306
            return None
307
        return DateTime(value)
308
309
310
class DXDateTimeFieldNodeAdapter(ATFieldNodeAdapter):
311
    """Import/Export Date Fields
312
    """
313
    adapts(IDexterityContent, IDatetime, ISetupEnviron)
314
315
    def get_json_value(self):
316
        """Returns the date as ISO string
317
        """
318
        value = self.field.get(self.context)
319
        if not isinstance(value, datetime):
320
            return ""
321
        return dtime.to_iso_format(value)
322
323
    def parse_json_value(self, value):
324
        if not value:
325
            return None
326
        # Avoid `UnknownTimeZoneError` by using the date API for conversion
327
        # also see https://github.com/senaite/senaite.patient/pull/29
328
        return dtime.to_dt(value)
329
330
331
class ATReferenceFieldNodeAdapter(ATFieldNodeAdapter):
332
    """Import/Export UID Reference Fields
333
    """
334
    adapts(IBaseObject, IReferenceField, ISetupEnviron)
335
336
    def get_json_value(self):
337
        """Convert referenced objects to UIDs
338
        """
339
        value = self.field.get(self.context)
340
        if api.is_object(value):
341
            value = api.get_uid(value)
342
        elif isinstance(value, list):
343
            value = map(api.get_uid, value)
344
        else:
345
            value = ""
346
        return json.dumps(value)
347
348
349
class ATUIDReferenceFieldNodeAdapter(ATReferenceFieldNodeAdapter):
350
    """Import/Export UID Reference Fields
351
    """
352
    adapts(IBaseObject, IUIDReferenceField, ISetupEnviron)
353
354
355
class ATRecordFieldNodeAdapter(ATFieldNodeAdapter):
356
    """Import/Export Records Fields
357
    """
358
    adapts(IBaseObject, IRecordField, ISetupEnviron)
359
360
361
class ATRichTextFieldNodeAdapter(ATFieldNodeAdapter):
362
    """Node im- and exporter for AT RichText fields.
363
    """
364
    implements(IFieldNode)
365
    adapts(IBaseObject, IRichText, ISetupEnviron)
366
367 View Code Duplication
    def get_field_value(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
368
        """Get the field value
369
        """
370
        value = self.field.get(self.context)
371
        if not value:
372
            return ""
373
        try:
374
            return value.raw
375
        except AttributeError as e:
376
            logger.info("Imported value has no Attribute 'raw' {}"
377
                        .format(str(e)))
378
            return value
379
380
381
class DXTupleFieldNodeAdapter(DXFieldNodeAdapter):
382
    """Node im- and exporter for DX Tuple fields.
383
    """
384
    implements(IFieldNode)
385
    adapts(IDexterityContent, ITuple, ISetupEnviron)
386
387
    def set_field_value(self, value, **kw):
388
        """Set the field value
389
        """
390
        if isinstance(value, list):
391
            value = tuple(value)
392
        super(DXTupleFieldNodeAdapter, self).set_field_value(value, **kw)
393
394
395
class DXTextLineFieldNodeAdapter(DXFieldNodeAdapter):
396
    """Node im- and exporter for DX TextLine fields.
397
    """
398
    implements(IFieldNode)
399
    adapts(IDexterityContent, ITextLine, ISetupEnviron)
400
401
402
class DXTextFieldNodeAdapter(DXFieldNodeAdapter):
403
    """Node im- and exporter for DX Text fields.
404
    """
405
    implements(IFieldNode)
406
    adapts(IDexterityContent, IText, ISetupEnviron)
407
408
409
class DXRichTextFieldNodeAdapter(DXFieldNodeAdapter):
410
    """Node im- and exporter for DX RichText fields.
411
    """
412
    implements(IFieldNode)
413
    adapts(IDexterityContent, IRichText, ISetupEnviron)
414
415 View Code Duplication
    def get_field_value(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
416
        """Get the field value
417
        """
418
        value = self.field.get(self.context)
419
        if not value:
420
            return ""
421
        try:
422
            return value.raw
423
        except AttributeError as e:
424
            logger.info("Imported value has no Attribute 'raw' {}"
425
                        .format(str(e)))
426
            return value
427
428
429
class DXReferenceFieldNodeAdapter(ATReferenceFieldNodeAdapter):
430
    """Import/Export DX UID Reference Fields
431
    """
432
    adapts(IDexterityContent, IUIDReferenceFieldDX, ISetupEnviron)
433
434
435
class DXDataGridFieldNodeAdapter(ATRecordFieldNodeAdapter):
436
    """Import/Export if DataGrid Fields
437
    """
438
    adapts(IDexterityContent, IDataGridField, ISetupEnviron)
439