Total Complexity | 56 |
Total Lines | 439 |
Duplicated Lines | 5.47 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like senaite.core.exportimport.genericsetup.adapters often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2021 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | import json |
||
22 | from datetime import datetime |
||
23 | from mimetypes import guess_type |
||
24 | |||
25 | import six |
||
26 | |||
27 | from bika.lims import api |
||
28 | from bika.lims import logger |
||
29 | from bika.lims.interfaces.field import IUIDReferenceField |
||
30 | from DateTime import DateTime |
||
31 | from plone.app.blob.interfaces import IBlobField |
||
32 | from plone.app.textfield.interfaces import IRichText |
||
33 | from plone.dexterity.interfaces import IDexterityContent |
||
34 | from plone.namedfile.interfaces import INamedField |
||
35 | from Products.Archetypes.interfaces import IBaseObject |
||
36 | from Products.Archetypes.interfaces import IDateTimeField |
||
37 | from Products.Archetypes.interfaces import IField |
||
38 | from Products.Archetypes.interfaces import IFileField |
||
39 | from Products.Archetypes.interfaces import IReferenceField |
||
40 | from Products.Archetypes.interfaces import ITextField |
||
41 | from Products.CMFPlone.utils import safe_unicode |
||
42 | from Products.GenericSetup.interfaces import ISetupEnviron |
||
43 | from Products.GenericSetup.utils import NodeAdapterBase |
||
44 | from senaite.core.api import dtime |
||
45 | from senaite.core.schema.interfaces import IDataGridField |
||
46 | from senaite.core.schema.interfaces import \ |
||
47 | IUIDReferenceField as IUIDReferenceFieldDX |
||
48 | from z3c.form.interfaces import IDataManager |
||
49 | from zope.component import adapts |
||
50 | from zope.component import getMultiAdapter |
||
51 | from zope.interface import implements |
||
52 | from zope.schema.interfaces import IDatetime |
||
53 | from zope.schema.interfaces import IField as ISchemaField |
||
54 | from zope.schema.interfaces import IText |
||
55 | from zope.schema.interfaces import ITextLine |
||
56 | from zope.schema.interfaces import ITuple |
||
57 | |||
58 | from .config import SITE_ID |
||
59 | from .interfaces import IFieldNode |
||
60 | from .interfaces import IRecordField |
||
61 | |||
62 | SKIP_FIELDS = [ |
||
63 | "id", |
||
64 | "rights", |
||
65 | ] |
||
66 | |||
67 | |||
68 | class ATFieldNodeAdapter(NodeAdapterBase): |
||
69 | """Node im- and exporter for AT Fields. |
||
70 | """ |
||
71 | implements(IFieldNode) |
||
72 | adapts(IBaseObject, IField, ISetupEnviron) |
||
73 | |||
74 | el = "field" |
||
75 | |||
76 | def __init__(self, context, field, environ): |
||
77 | super(ATFieldNodeAdapter, self).__init__(context, environ) |
||
78 | self.field = field |
||
79 | |||
80 | def can_write(self): |
||
81 | """Checks if the field is writable |
||
82 | """ |
||
83 | readonly = getattr(self.field, "readonly", False) |
||
84 | if readonly: |
||
85 | return False |
||
86 | return True |
||
87 | |||
88 | def set_field_value(self, value, **kw): |
||
89 | """Set the field value |
||
90 | """ |
||
91 | # logger.info("Set: {} -> {}".format(self.field.getName(), value)) |
||
92 | if not self.can_write(): |
||
93 | logger.info("Skipping readonly field %s.%s" % ( |
||
94 | self.context.__class__.__name__, self.field.__name__)) |
||
95 | return |
||
96 | return self.field.set(self.context, value, **kw) |
||
97 | |||
98 | def get_field_value(self): |
||
99 | """Get the field value |
||
100 | """ |
||
101 | return self.field.get(self.context) |
||
102 | |||
103 | def get_json_value(self): |
||
104 | """JSON converted field value |
||
105 | """ |
||
106 | value = self.get_field_value() |
||
107 | try: |
||
108 | # Always handle the value as unicode |
||
109 | return json.dumps(safe_unicode(value)) |
||
110 | except TypeError: |
||
111 | logger.error( |
||
112 | "ParseError: '{}.{} ('{}')' is not JSON serializable!".format( |
||
113 | self.context.getId(), self.field.getName(), repr(value))) |
||
114 | return "" |
||
115 | |||
116 | def parse_json_value(self, value): |
||
117 | return json.loads(value) |
||
118 | |||
119 | def get_node_value(self, value): |
||
120 | """Convert the field value to a XML node |
||
121 | """ |
||
122 | node = self._doc.createElement(self.el) |
||
123 | node.setAttribute("name", self.field.getName()) |
||
124 | child = self._doc.createTextNode(value) |
||
125 | node.appendChild(child) |
||
126 | return node |
||
127 | |||
128 | def set_node_value(self, node): |
||
129 | value = self.parse_json_value(node.nodeValue) |
||
130 | # encode unicodes to UTF8 |
||
131 | if isinstance(value, unicode): |
||
|
|||
132 | value = value.encode("utf8") |
||
133 | self.set_field_value(value) |
||
134 | |||
135 | def _exportNode(self): |
||
136 | """Export the object as a DOM node. |
||
137 | """ |
||
138 | value = self.get_json_value() |
||
139 | return self.get_node_value(value) |
||
140 | |||
141 | def _importNode(self, node): |
||
142 | """Import the object from the DOM node. |
||
143 | """ |
||
144 | if self.field.getName() in SKIP_FIELDS: |
||
145 | return |
||
146 | child = node.firstChild |
||
147 | if child is None: |
||
148 | return |
||
149 | if child.nodeName != "#text": |
||
150 | logger.warning("No textnode found!") |
||
151 | return False |
||
152 | self.set_node_value(child) |
||
153 | |||
154 | node = property(_exportNode, _importNode) |
||
155 | |||
156 | |||
157 | class DXFieldNodeAdapter(ATFieldNodeAdapter): |
||
158 | """Node im- and exporter for DX Fields. |
||
159 | """ |
||
160 | implements(IFieldNode) |
||
161 | adapts(IDexterityContent, ISchemaField, ISetupEnviron) |
||
162 | |||
163 | def __init__(self, context, field, environ): |
||
164 | super(DXFieldNodeAdapter, self).__init__(context, field, environ) |
||
165 | self.field = field |
||
166 | |||
167 | def can_write(self): |
||
168 | """Checks if the field is writable |
||
169 | """ |
||
170 | readonly = getattr(self.field, "readonly", False) |
||
171 | if readonly: |
||
172 | return False |
||
173 | dm = getMultiAdapter((self.context, self.field), IDataManager) |
||
174 | writable = dm.canWrite() |
||
175 | if not writable: |
||
176 | return False |
||
177 | return True |
||
178 | |||
179 | def set_node_value(self, node): |
||
180 | value = self.parse_json_value(node.nodeValue) |
||
181 | self.set_field_value(value) |
||
182 | |||
183 | def set_field_value(self, value, **kw): |
||
184 | """Set the field value |
||
185 | """ |
||
186 | if not self.can_write(): |
||
187 | logger.info("Skipping readonly field %s.%s" % ( |
||
188 | self.context.__class__.__name__, self.field.__name__)) |
||
189 | return |
||
190 | # logger.info("Set: {} -> {}".format(self.field.getName(), value)) |
||
191 | dm = getMultiAdapter((self.context, self.field), IDataManager) |
||
192 | dm.set(value, **kw) |
||
193 | |||
194 | def get_field_value(self): |
||
195 | """Get the field value |
||
196 | """ |
||
197 | dm = getMultiAdapter((self.context, self.field), IDataManager) |
||
198 | return dm.get() |
||
199 | |||
200 | |||
201 | class ATTextFieldNodeAdapter(ATFieldNodeAdapter): |
||
202 | """Import/Export Text |
||
203 | """ |
||
204 | adapts(IBaseObject, ITextField, ISetupEnviron) |
||
205 | |||
206 | |||
207 | class ATFileFieldNodeAdapter(ATFieldNodeAdapter): |
||
208 | """Import/Export Files/Images |
||
209 | """ |
||
210 | adapts(IBaseObject, IFileField, ISetupEnviron) |
||
211 | |||
212 | def set_node_value(self, node): |
||
213 | filename = node.nodeValue |
||
214 | filepath = "/".join([self.get_archive_path(), filename]) |
||
215 | data = self.get_file_data(filepath) |
||
216 | self.set_field_value(data, filename=filename) |
||
217 | |||
218 | def get_archive_path(self): |
||
219 | """Get the unified archive path |
||
220 | """ |
||
221 | site = self.environ.getSite() |
||
222 | site_path = api.get_path(site) |
||
223 | obj_path = api.get_path(self.context) |
||
224 | return obj_path.replace(site_path, SITE_ID, 1) |
||
225 | |||
226 | def get_file_data(self, path): |
||
227 | """Return the file data from the archive path |
||
228 | """ |
||
229 | return self.environ.readDataFile(path) |
||
230 | |||
231 | def get_content_type(self, content, default="application/octet-stream"): |
||
232 | """Returns the content type of the object |
||
233 | """ |
||
234 | return getattr(content, "content_type", default) |
||
235 | |||
236 | def get_json_value(self): |
||
237 | """Returns the filename |
||
238 | """ |
||
239 | value = self.get_field_value() |
||
240 | |||
241 | if isinstance(value, six.string_types): |
||
242 | return value |
||
243 | |||
244 | filename = safe_unicode(value.filename) or "" |
||
245 | data = value.data |
||
246 | if filename and data: |
||
247 | path = self.get_archive_path() |
||
248 | content_type = self.get_content_type(value) |
||
249 | self.environ.writeDataFile(filename, str(data), content_type, path) |
||
250 | return filename |
||
251 | |||
252 | |||
253 | class ATBlobFileFieldNodeAdapter(ATFileFieldNodeAdapter): |
||
254 | """Import/Export AT Files/Images |
||
255 | """ |
||
256 | adapts(IBaseObject, IBlobField, ISetupEnviron) |
||
257 | |||
258 | |||
259 | class DXNamedFileFieldNodeAdapter(ATBlobFileFieldNodeAdapter): |
||
260 | """Import/Export DX Files/Images |
||
261 | """ |
||
262 | adapts(IDexterityContent, INamedField, ISetupEnviron) |
||
263 | |||
264 | def get_content_type(self, content, default="application/octet-stream"): |
||
265 | """Returns the content type of the object |
||
266 | """ |
||
267 | return getattr(content, "contentType", default) |
||
268 | |||
269 | def set_node_value(self, node): |
||
270 | filename = node.nodeValue |
||
271 | filepath = "/".join([self.get_archive_path(), filename]) |
||
272 | data = self.get_file_data(filepath) |
||
273 | mime_type, encoding = guess_type(filename) |
||
274 | self.set_field_value(data, filename=filename, content_type=mime_type) |
||
275 | |||
276 | def set_field_value(self, value, **kw): |
||
277 | """Set the field value |
||
278 | """ |
||
279 | # logger.info("Set: {} -> {}".format(self.field.getName(), value)) |
||
280 | data = value |
||
281 | if not data: |
||
282 | logger.error("Can not set empty file contents") |
||
283 | return |
||
284 | filename = kw.get("filename", "") |
||
285 | contentType = kw.get("mimetype") or kw.get("content_type") |
||
286 | value = self.field._type( |
||
287 | data=data, contentType=contentType, filename=filename) |
||
288 | self.field.set(self.context, value) |
||
289 | |||
290 | |||
291 | class ATDateTimeFieldNodeAdapter(ATFieldNodeAdapter): |
||
292 | """Import/Export Date Fields |
||
293 | """ |
||
294 | adapts(IBaseObject, IDateTimeField, ISetupEnviron) |
||
295 | |||
296 | def get_json_value(self): |
||
297 | """Returns the date as ISO string |
||
298 | """ |
||
299 | value = self.field.get(self.context) |
||
300 | if not isinstance(value, DateTime): |
||
301 | return "" |
||
302 | return value.ISO() |
||
303 | |||
304 | def parse_json_value(self, value): |
||
305 | if not value: |
||
306 | return None |
||
307 | return DateTime(value) |
||
308 | |||
309 | |||
310 | class DXDateTimeFieldNodeAdapter(ATFieldNodeAdapter): |
||
311 | """Import/Export Date Fields |
||
312 | """ |
||
313 | adapts(IDexterityContent, IDatetime, ISetupEnviron) |
||
314 | |||
315 | def get_json_value(self): |
||
316 | """Returns the date as ISO string |
||
317 | """ |
||
318 | value = self.field.get(self.context) |
||
319 | if not isinstance(value, datetime): |
||
320 | return "" |
||
321 | return dtime.to_iso_format(value) |
||
322 | |||
323 | def parse_json_value(self, value): |
||
324 | if not value: |
||
325 | return None |
||
326 | # Avoid `UnknownTimeZoneError` by using the date API for conversion |
||
327 | # also see https://github.com/senaite/senaite.patient/pull/29 |
||
328 | return dtime.to_dt(value) |
||
329 | |||
330 | |||
331 | class ATReferenceFieldNodeAdapter(ATFieldNodeAdapter): |
||
332 | """Import/Export UID Reference Fields |
||
333 | """ |
||
334 | adapts(IBaseObject, IReferenceField, ISetupEnviron) |
||
335 | |||
336 | def get_json_value(self): |
||
337 | """Convert referenced objects to UIDs |
||
338 | """ |
||
339 | value = self.field.get(self.context) |
||
340 | if api.is_object(value): |
||
341 | value = api.get_uid(value) |
||
342 | elif isinstance(value, list): |
||
343 | value = map(api.get_uid, value) |
||
344 | else: |
||
345 | value = "" |
||
346 | return json.dumps(value) |
||
347 | |||
348 | |||
349 | class ATUIDReferenceFieldNodeAdapter(ATReferenceFieldNodeAdapter): |
||
350 | """Import/Export UID Reference Fields |
||
351 | """ |
||
352 | adapts(IBaseObject, IUIDReferenceField, ISetupEnviron) |
||
353 | |||
354 | |||
355 | class ATRecordFieldNodeAdapter(ATFieldNodeAdapter): |
||
356 | """Import/Export Records Fields |
||
357 | """ |
||
358 | adapts(IBaseObject, IRecordField, ISetupEnviron) |
||
359 | |||
360 | |||
361 | class ATRichTextFieldNodeAdapter(ATFieldNodeAdapter): |
||
362 | """Node im- and exporter for AT RichText fields. |
||
363 | """ |
||
364 | implements(IFieldNode) |
||
365 | adapts(IBaseObject, IRichText, ISetupEnviron) |
||
366 | |||
367 | View Code Duplication | def get_field_value(self): |
|
368 | """Get the field value |
||
369 | """ |
||
370 | value = self.field.get(self.context) |
||
371 | if not value: |
||
372 | return "" |
||
373 | try: |
||
374 | return value.raw |
||
375 | except AttributeError as e: |
||
376 | logger.info("Imported value has no Attribute 'raw' {}" |
||
377 | .format(str(e))) |
||
378 | return value |
||
379 | |||
380 | |||
381 | class DXTupleFieldNodeAdapter(DXFieldNodeAdapter): |
||
382 | """Node im- and exporter for DX Tuple fields. |
||
383 | """ |
||
384 | implements(IFieldNode) |
||
385 | adapts(IDexterityContent, ITuple, ISetupEnviron) |
||
386 | |||
387 | def set_field_value(self, value, **kw): |
||
388 | """Set the field value |
||
389 | """ |
||
390 | if isinstance(value, list): |
||
391 | value = tuple(value) |
||
392 | super(DXTupleFieldNodeAdapter, self).set_field_value(value, **kw) |
||
393 | |||
394 | |||
395 | class DXTextLineFieldNodeAdapter(DXFieldNodeAdapter): |
||
396 | """Node im- and exporter for DX TextLine fields. |
||
397 | """ |
||
398 | implements(IFieldNode) |
||
399 | adapts(IDexterityContent, ITextLine, ISetupEnviron) |
||
400 | |||
401 | |||
402 | class DXTextFieldNodeAdapter(DXFieldNodeAdapter): |
||
403 | """Node im- and exporter for DX Text fields. |
||
404 | """ |
||
405 | implements(IFieldNode) |
||
406 | adapts(IDexterityContent, IText, ISetupEnviron) |
||
407 | |||
408 | |||
409 | class DXRichTextFieldNodeAdapter(DXFieldNodeAdapter): |
||
410 | """Node im- and exporter for DX RichText fields. |
||
411 | """ |
||
412 | implements(IFieldNode) |
||
413 | adapts(IDexterityContent, IRichText, ISetupEnviron) |
||
414 | |||
415 | View Code Duplication | def get_field_value(self): |
|
416 | """Get the field value |
||
417 | """ |
||
418 | value = self.field.get(self.context) |
||
419 | if not value: |
||
420 | return "" |
||
421 | try: |
||
422 | return value.raw |
||
423 | except AttributeError as e: |
||
424 | logger.info("Imported value has no Attribute 'raw' {}" |
||
425 | .format(str(e))) |
||
426 | return value |
||
427 | |||
428 | |||
429 | class DXReferenceFieldNodeAdapter(ATReferenceFieldNodeAdapter): |
||
430 | """Import/Export DX UID Reference Fields |
||
431 | """ |
||
432 | adapts(IDexterityContent, IUIDReferenceFieldDX, ISetupEnviron) |
||
433 | |||
434 | |||
435 | class DXDataGridFieldNodeAdapter(ATRecordFieldNodeAdapter): |
||
436 | """Import/Export if DataGrid Fields |
||
437 | """ |
||
438 | adapts(IDexterityContent, IDataGridField, ISetupEnviron) |
||
439 |