Passed
Push — 2.x ( 97ce14...62b538 )
by Ramon
05:15
created

RemarksField._parse_legacy_remarks()   B

Complexity

Conditions 6

Size

Total Lines 51
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 51
rs 8.3706
c 0
b 0
f 0
cc 6
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import re
22
23
import six
24
25
from AccessControl import ClassSecurityInfo
26
from bika.lims import api
27
from bika.lims.browser.widgets import RemarksWidget
28
from bika.lims.events import RemarksAddedEvent
29
from bika.lims.interfaces import IRemarksField
30
from bika.lims.utils import tmpID
31
from DateTime import DateTime
32
from Products.Archetypes.event import ObjectEditedEvent
33
from Products.Archetypes.Field import ObjectField
34
from Products.Archetypes.Registry import registerField
35
from Products.CMFPlone.i18nl10n import ulocalized_time
36
from zope import event
37
from zope.interface import implements
38
39
40
class RemarksHistory(list):
41
    """A list containing a remarks history, but __str__ returns the legacy
42
    format from instances prior v1.3.3
43
    """
44
45
    def html(self):
46
        return api.text_to_html(str(self))
47
48
    def __str__(self):
49
        """Returns the remarks in legacy format
50
        """
51
        remarks = map(lambda rec: str(rec), self)
52
        remarks = filter(None, remarks)
53
        return "\n".join(remarks)
54
55
    def __eq__(self, y):
56
        if isinstance(y, six.string_types):
57
            return str(self) == y
58
        return super(RemarksHistory, self).__eq__(y)
59
60
61
class RemarksHistoryRecord(dict):
62
    """A dict implementation that represents a record/entry of Remarks History
63
    """
64
65
    def __init__(self, *arg, **kw):
66
        super(RemarksHistoryRecord, self).__init__(*arg, **kw)
67
        self["id"] = self.id or tmpID()
68
        self["user_id"] = self.user_id
69
        self["user_name"] = self.user_name
70
        self["created"] = self.created or DateTime().ISO()
71
        self["content"] = self.content
72
73
    @property
74
    def id(self):
75
        return self.get("id", "")
76
77
    @property
78
    def user_id(self):
79
        return self.get("user_id", "")
80
81
    @property
82
    def user_name(self):
83
        return self.get("user_name", "")
84
85
    @property
86
    def created(self):
87
        return self.get("created", "")
88
89
    @property
90
    def created_ulocalized(self):
91
        return ulocalized_time(self.created,
92
                               long_format=True,
93
                               context=api.get_portal(),
94
                               request=api.get_request(),
95
                               domain="senaite.core")
96
97
    @property
98
    def content(self):
99
        return self.get("content", "")
100
101
    @property
102
    def html_content(self):
103
        return api.text_to_html(self.content)
104
105
    def __str__(self):
106
        """Returns a legacy string format of the Remarks record
107
        """
108
        if not self.content:
109
            return ""
110
        if self.created and self.user_id:
111
            # Build the legacy format
112
            return "=== {} ({})\n{}".format(self.created, self.user_id,
113
                                            self.content)
114
        return self.content
115
116
117
class RemarksField(ObjectField):
118
    """A field that stores remarks.  The value submitted to the setter
119
    will always be appended to the actual value of the field.
120
    A divider will be included with extra information about the text.
121
    """
122
123
    _properties = ObjectField._properties.copy()
124
    _properties.update({
125
        'type': 'remarks',
126
        'widget': RemarksWidget,
127
        'default': '',
128
    })
129
130
    implements(IRemarksField)
131
    security = ClassSecurityInfo()
132
133
    @property
134
    def searchable(self):
135
        """Returns False, preventing this field to be searchable by AT's
136
        SearcheableText
137
        """
138
        return False
139
140
    @security.private
141
    def set(self, instance, value, **kwargs):
142
        """Adds the value to the existing text stored in the field,
143
        along with a small divider showing username and date of this entry.
144
        """
145
146
        if not value:
147
            return
148
149
        if isinstance(value, RemarksHistory):
150
            # Override the whole history here
151
            history = value
152
153
        elif isinstance(value, (list, tuple)):
154
            # This is a list, convert to RemarksHistory
155
            remarks = map(lambda item: RemarksHistoryRecord(item), value)
156
            history = RemarksHistory(remarks)
157
158
        elif isinstance(value, RemarksHistoryRecord):
159
            # This is a record, append to the history
160
            history = self.get_history(instance)
161
            history.insert(0, value)
162
163
        elif isinstance(value, six.string_types):
164
            # Create a new history record
165
            record = self.to_history_record(value)
166
167
            # Append the new record to the history
168
            history = self.get_history(instance)
169
            history.insert(0, record)
170
171
        else:
172
            raise ValueError("Type not supported: {}".format(type(value)))
173
174
        # filter nasty html in the complete history
175
        for record in history:
176
            content = record.get("content")
177
            record["content"] = self.to_safe_html(content)
178
179
        # Store the data
180
        ObjectField.set(self, instance, history)
181
182
        if not api.is_temporary(instance):
183
184
            # N.B. ensure updated catalog metadata for the snapshot
185
            instance.reindexObject()
186
187
            # notify object edited event
188
            event.notify(ObjectEditedEvent(instance))
189
190
            # notify new remarks for e.g. later email notification etc.
191
            event.notify(RemarksAddedEvent(instance, history))
192
193
    def to_safe_html(self, html):
194
        # see: Products.PortalTransforms.tests.test_xss
195
        pt = api.get_tool("portal_transforms")
196
        stream = pt.convertTo("text/x-html-safe", html)
197
        return stream.getData()
198
199
    def get(self, instance, **kwargs):
200
        """Returns a RemarksHistory object
201
        """
202
        return self.get_history(instance)
203
204
    def getRaw(self, instance, **kwargs):
205
        """Returns raw field value (possible wrapped in BaseUnit)
206
        """
207
        value = ObjectField.get(self, instance, **kwargs)
208
        # getattr(instance, "Remarks") returns a BaseUnit
209
        if callable(value):
210
            value = value()
211
        return value
212
213
    def to_history_record(self, value):
214
        """Transforms the value to an history record
215
        """
216
        user = api.get_current_user()
217
        contact = api.get_user_contact(user)
218
        fullname = contact and contact.getFullname() or ""
219
        if not contact:
220
            # get the fullname from the user properties
221
            props = api.get_user_properties(user)
222
            fullname = props.get("fullname", "")
223
        return RemarksHistoryRecord(user_id=user.id,
224
                                    user_name=fullname,
225
                                    content=value.strip())
226
227
    def get_history(self, instance):
228
        """Returns a RemarksHistory object with the remarks entries
229
        """
230
        remarks = instance.getRawRemarks()
231
        if not remarks:
232
            return RemarksHistory()
233
234
        # Backwards compatibility with legacy from < v1.3.3
235
        if isinstance(remarks, six.string_types):
236
            parsed_remarks = self._parse_legacy_remarks(remarks)
237
            if parsed_remarks is None:
238
                remark = RemarksHistoryRecord(content=remarks.strip())
239
                remarks = RemarksHistory([remark, ])
240
            else:
241
                remarks = RemarksHistory(
242
                    map(lambda r: RemarksHistoryRecord(r), parsed_remarks))
243
244
        return remarks
245
246
    def _parse_legacy_remarks(self, text):
247
        """Parse legacy remarks from the text
248
        """
249
250
        # split legacy remarks on the complete delimiter, e.g.:
251
        # === Tue, 28 Jan 2020 06:53:58 +0100 (admin)\nThis is a Test
252
        lines = re.split(r"(===) ([A-Za-z]{3}, \d{1,2} [A-Za-z]{3} \d{2,4} \d{2}:\d{2}:\d{2} [+-]{1}\d{4}) \((.*?)\)", text)  # noqa
253
254
        record = None
255
        records = []
256
257
        # group into remark records of date, user-id and content
258
        for line in lines:
259
            # start a new remarks record when the marker was found
260
            if line == "===":
261
                record = []
262
                # immediately append the new entry to the records
263
                records.append(record)
264
                # skip the marker entry
265
                continue
266
267
            # append the line to the entry until the next marker is found
268
            # -> this also skips the empty first line
269
            if record is not None:
270
                record.append(line)
271
272
        remarks = []
273
274
        for record in records:
275
            # each record must contain the date, user-id and text
276
            # -> we invalidate the whole parsing if this is not given
277
            if len(record) != 3:
278
                return None
279
280
            created, userid, content = record
281
282
            # try to get the full name of the user id
283
            fullname = self._get_fullname_from_user_id(userid)
284
285
            # strip off leading and trailing escape sequences from the content
286
            content = content.strip("\n\r\t")
287
288
            # append a remarks record
289
            remarks.append({
290
               "created": created,
291
               "user_id": userid,
292
               "user_name": fullname,
293
               "content": content,
294
            })
295
296
        return remarks
297
298
    def _get_fullname_from_user_id(self, userid, default=""):
299
        """Try the fullname of the user
300
        """
301
        fullname = default
302
        user = api.get_user(userid)
303
        if user:
304
            props = api.get_user_properties(user)
305
            fullname = props.get("fullname", fullname)
306
            contact = api.get_user_contact(user)
307
            fullname = contact and contact.getFullname() or fullname
308
        return fullname
309
310
311
registerField(RemarksField, title="Remarks", description="")
312