Passed
Push — 2.x ( 52074a...5a6c12 )
by Ramon
06:28
created

bika.lims.api.snapshot   C

Complexity

Total Complexity 53

Size/Duplication

Total Lines 465
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 53
eloc 188
dl 0
loc 465
rs 6.96
c 0
b 0
f 0

23 Functions

Rating   Name   Duplication   Size   Complexity  
A get_snapshot_by_version() 0 16 3
A diff_values() 0 25 3
C _process_value() 0 31 10
A take_snapshot() 0 34 2
A get_snapshots() 0 8 1
A get_object_metadata() 0 29 1
A get_storage() 0 10 2
A get_snapshot_count() 0 12 2
A get_request_data() 0 56 3
B compare_snapshots() 0 24 6
A _objectdata_cache_key() 0 7 1
A compare_last_two_snapshots() 0 13 2
A has_snapshots() 0 8 1
A supports_snapshots() 0 11 2
A get_snapshot_metadata() 0 7 1
A get_object_data() 0 23 2
A pause_snapshots_for() 0 4 1
A resume_snapshots_for() 0 4 1
A get_last_snapshot() 0 8 1
A get_version() 0 10 1
A _get_title_or_id_from_uid() 0 11 3
A get_snapshot_version() 0 11 2
A disable_snapshots() 0 14 2

How to fix   Complexity   

Complexity

Complex classes like bika.lims.api.snapshot often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
23
import six
24
25
from bika.lims import _
26
from bika.lims import api
27
from bika.lims import logger
28
from bika.lims.api.security import get_roles
29
from bika.lims.api.security import get_user_id
30
from bika.lims.interfaces import IAuditable
31
from bika.lims.interfaces import IDoNotSupportSnapshots
32
from DateTime import DateTime
33
from persistent.list import PersistentList
34
from plone.memoize.ram import cache
35
from senaite.app.supermodel import SuperModel
36
from zope.annotation.interfaces import IAnnotatable
37
from zope.annotation.interfaces import IAnnotations
38
from zope.interface import alsoProvides
39
from zope.interface import noLongerProvides
40
41
SNAPSHOT_STORAGE = "senaite.core.snapshots"
42
43
44
def _objectdata_cache_key(func, obj):
45
    """Cache Key for object data
46
    """
47
    uid = api.get_uid(obj)
48
    modified = api.get_modification_date(obj).millis()
49
    review_state = api.get_review_status(obj)
50
    return "{}-{}-{}".format(uid, review_state, modified)
51
52
53
def supports_snapshots(obj):
54
    """Checks if the object supports snapshots
55
56
    Only objects which can hold an annotation storage can be auditable
57
58
    :param obj: Content object
59
    :returns: True/False
60
    """
61
    if IDoNotSupportSnapshots.providedBy(obj):
62
        return False
63
    return IAnnotatable.providedBy(obj)
64
65
66
def get_storage(obj):
67
    """Get or create the audit log storage for the given object
68
69
    :param obj: Content object
70
    :returns: PersistentList
71
    """
72
    annotation = IAnnotations(obj)
73
    if annotation.get(SNAPSHOT_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SNAPSHOT_STORAGE does not seem to be defined.
Loading history...
74
        annotation[SNAPSHOT_STORAGE] = PersistentList()
75
    return annotation[SNAPSHOT_STORAGE]
76
77
78
def get_snapshots(obj):
79
    """Get all snapshots from the storage
80
81
    :param obj: Content object
82
    :returns: List of snapshot dictionaries
83
    """
84
    snapshots = get_storage(obj)
85
    return map(json.loads, snapshots)
86
87
88
def has_snapshots(obj):
89
    """Checks if the object has snapshots
90
91
    :param obj: Content object
92
    :returns: True/False
93
    """
94
    storage = get_storage(obj)
95
    return len(storage) > 0
96
97
98
def get_snapshot_count(obj):
99
    """Returns the number of snapsots
100
101
    :param obj: Content object
102
    :returns: Current snapshots in the storage
103
    """
104
    try:
105
        annotation = IAnnotations(obj)
106
    except TypeError:
107
        return 0
108
    storage = annotation.get(SNAPSHOT_STORAGE, [])
109
    return len(storage)
110
111
112
def get_version(obj):
113
    """Returns the version of the object
114
115
    NOTE: Object versions start with 0!
116
117
    :param obj: Content object
118
    :returns: Current version of the object or -1
119
    """
120
    count = get_snapshot_count(obj)
121
    return count - 1
122
123
124
def get_snapshot_by_version(obj, version=0):
125
    """Get a snapshot by version
126
127
    Snapshot versions begin with `0`, because this is the first index of the
128
    storage, which is a list.
129
130
    :param obj: Content object
131
    :param version: The index position of the snapshot in the storage
132
    :returns: Snapshot at the given index position
133
    """
134
    if version < 0:
135
        return None
136
    snapshots = get_snapshots(obj)
137
    if version > len(snapshots) - 1:
138
        return None
139
    return snapshots[version]
140
141
142
def get_snapshot_version(obj, snapshot):
143
    """Returns the version of the given snapshot
144
145
    :param obj: Content object
146
    :param snapshot: Snapshot dictionary
147
    :returns: Index where the object is located
148
    """
149
    snapshots = get_snapshots(obj)
150
    if snapshot not in snapshots:
151
        return -1
152
    return snapshots.index(snapshot)
153
154
155
def get_last_snapshot(obj):
156
    """Get the last snapshot
157
158
    :param obj: Content object
159
    :returns: Last Snapshot or None
160
    """
161
    version = get_version(obj)
162
    return get_snapshot_by_version(obj, version)
163
164
165
def get_snapshot_metadata(snapshot):
166
    """Returns the snapshot metadata
167
168
    :param snapshot: Snapshot dictionary
169
    :returns: Metadata dictionary of the snapshot
170
    """
171
    return snapshot.get("__metadata__", {})
172
173
174
@cache(_objectdata_cache_key)
175
def get_object_data(obj):
176
    """Get object schema data
177
178
    NOTE: We RAM cache this data because it should only change when the object
179
    was modified!
180
181
    XXX: We need to set at least the modification date when we set fields in
182
    Ajax Listing when we take a snapshot there!
183
184
    :param obj: Content object
185
    :returns: Dictionary of extracted schema data
186
    """
187
188
    try:
189
        model = SuperModel(obj)
190
        data = model.to_dict()
191
    except Exception as exc:
192
        logger.error("Failed to get schema data for {}: {}"
193
                     .format(repr(obj), str(exc)))
194
        data = {}
195
196
    return data
197
198
199
def get_request_data(request=None):
200
    """Get request header/form data
201
202
    A typical request behind NGINX looks like this:
203
204
    {
205
        'CONNECTION_TYPE': 'close',
206
        'CONTENT_LENGTH': '52',
207
        'CONTENT_TYPE': 'application/x-www-form-urlencoded; charset=UTF-8',
208
        'GATEWAY_INTERFACE': 'CGI/1.1',
209
        'HTTP_ACCEPT': 'application/json, text/javascript, */*; q=0.01',
210
        'HTTP_ACCEPT_ENCODING': 'gzip, deflate, br',
211
        'HTTP_ACCEPT_LANGUAGE': 'de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
212
        'HTTP_COOKIE': '_ga=GA1.2.1058345096.1522506452; ...',
213
        'HTTP_HOST': 'senaite.ridingbytes.com',
214
        'HTTP_ORIGIN': 'https://senaite.ridingbytes.com',
215
        'HTTP_REFERER': 'https://senaite.ridingbytes.com/clients/client-1/H2O-0054',
216
        'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36',
217
        'HTTP_X_FORWARDED_FOR': '93.238.47.95',
218
        'HTTP_X_REAL_IP': '93.238.47.95',
219
        'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
220
        'PATH_INFO': '/VirtualHostBase/https/senaite.ridingbytes.com/senaite/VirtualHostRoot//@@API/update',
221
        'PATH_TRANSLATED': '/VirtualHostBase/https/senaite.ridingbytes.com/senaite/VirtualHostRoot/@@API/update',
222
        'QUERY_STRING': '',
223
        'REMOTE_ADDR': '127.0.0.1',
224
        'REQUEST_METHOD': 'POST',
225
        'SCRIPT_NAME': '',
226
        'SERVER_NAME': 'localhost',
227
        'SERVER_PORT': '8081',
228
        'SERVER_PROTOCOL': 'HTTP/1.0',
229
        'SERVER_SOFTWARE': 'Zope/(2.13.28, python 2.7.12, linux2) ZServer/1.1',
230
        'channel.creation_time': 1556086048
231
    }
232
233
    :param request: Request object
234
    :returns: Dictionary of extracted request header/form data
235
    """
236
237
    if request is None:
238
        # get the request
239
        request = api.get_request()
240
241
    # Happens in the test runner
242
    if not request:
243
        return {}
244
245
    # Try to obtain the real IP address of the client
246
    forwarded_for = request.get_header("X_FORWARDED_FOR")
247
    real_ip = request.get_header("X_REAL_IP")
248
    remote_address = request.get_header("REMOTE_ADDR")
249
250
    return {
251
        "comments": request.form.get("comments", ""),
252
        "remote_address": forwarded_for or real_ip or remote_address,
253
        "user_agent": request.get_header("HTTP_USER_AGENT"),
254
        "referer": request.get_header("HTTP_REFERER"),
255
    }
256
257
258
def get_object_metadata(obj, **kw):
259
    """Get object metadata
260
261
    :param obj: Content object
262
    :returns: Dictionary of extracted object metadata
263
    """
264
265
    # inject metadata of volatile data
266
    metadata = {
267
        "actor": get_user_id(),
268
        "roles": get_roles(),
269
        "action": "",
270
        "review_state": api.get_review_status(obj),
271
        "active": api.is_active(obj),
272
        "snapshot_created": DateTime().ISO(),
273
        "modified": api.get_modification_date(obj).ISO(),
274
        "remote_address": "",
275
        "user_agent": "",
276
        "referer": "",
277
        "comments": "",
278
    }
279
280
    # Update request data
281
    metadata.update(get_request_data())
282
283
    # allow metadata overrides
284
    metadata.update(kw)
285
286
    return metadata
287
288
289
def take_snapshot(obj, store=True, **kw):
290
    """Takes a snapshot of the passed in object
291
292
    :param obj: Content object
293
    :returns: New snapshot
294
    """
295
    logger.debug("📷 Take new snapshot for {}".format(repr(obj)))
296
297
    # get the object data
298
    snapshot = get_object_data(obj)
299
300
    # get the metadata
301
    metadata = get_object_metadata(obj, **kw)
302
303
    # store the metadata
304
    snapshot["__metadata__"] = metadata
305
306
    # convert the snapshot to JSON
307
    data = json.dumps(snapshot)
308
309
    # return immediately
310
    if not store:
311
        return snapshot
312
313
    # get the snapshot storage
314
    storage = get_storage(obj)
315
316
    # store the snapshot data
317
    storage.append(data)
318
319
    # Mark the content as auditable
320
    alsoProvides(obj, IAuditable)
321
322
    return snapshot
323
324
325
def pause_snapshots_for(obj):
326
    """Pause snapshots for the given object
327
    """
328
    alsoProvides(obj, IDoNotSupportSnapshots)
329
330
331
def resume_snapshots_for(obj):
332
    """Resume snapshots for the given object
333
    """
334
    noLongerProvides(obj, IDoNotSupportSnapshots)
335
336
337
def compare_snapshots(snapshot_a, snapshot_b, raw=False):
338
    """Returns a diff of two given snapshots (dictionaries)
339
340
    :param snapshot_a: First snapshot
341
    :param snapshot_b: Second snapshot
342
    :param raw: True to compare the raw values, e.g. UIDs
343
    :returns: Dictionary of field/value pairs that differ
344
    """
345
    if not all(map(lambda x: isinstance(x, dict),
346
                   [snapshot_a, snapshot_b])):
347
        return {}
348
349
    diffs = {}
350
    for key_a, value_a in six.iteritems(snapshot_a):
351
        # skip fieds starting with _ or __
352
        if key_a.startswith("_"):
353
            continue
354
        # get the value of the second snapshot
355
        value_b = snapshot_b.get(key_a)
356
        # get the diff between the two values
357
        diff = diff_values(value_a, value_b, raw=raw)
358
        if diff is not None:
359
            diffs[key_a] = diff
360
    return diffs
361
362
363
def compare_last_two_snapshots(obj, raw=False):
364
    """Helper to compare the last two snapshots directly
365
    """
366
367
    if get_snapshot_count(obj) < 2:
368
        return {}
369
370
    version = get_version(obj)
371
372
    snap1 = get_snapshot_by_version(obj, version - 1)
373
    snap2 = get_snapshot_by_version(obj, version)
374
375
    return compare_snapshots(snap1, snap2, raw=raw)
376
377
378
def diff_values(value_a, value_b, raw=False):
379
    """Returns a human-readable diff between two values
380
381
    TODO: Provide an adapter per content type for this task to enable a more
382
          specific diff between the values
383
384
    :param value_a: First value to compare
385
    :param value_b: Second value to compare
386
    :param raw: True to compare the raw values, e.g. UIDs
387
    :returns a list of diff tuples
388
    """
389
390
    if not raw:
391
        value_a = _process_value(value_a)
392
        value_b = _process_value(value_b)
393
394
    # No changes
395
    if value_a == value_b:
396
        return None
397
398
    diffs = []
399
    # N.B.: the choice for the tuple data structure is to enable in the future
400
    # more granular diffs, e.g. the changed values within a dictionary etc.
401
    diffs.append((value_a, value_b))
402
    return diffs
403
404
405
def _process_value(value):
406
    """Convert the value into a human readable diff string
407
    """
408
    if not value:
409
        value = _("Not set")
410
    # handle strings
411
    elif isinstance(value, six.string_types):
412
        # XXX: bad data, e.g. in AS Method field
413
        if value == "None":
414
            value = _("Not set")
415
        # 0 is detected as the portal UID
416
        elif value == "0":
417
            value = "0"
418
        # handle physical paths
419
        elif value.startswith("/"):
420
            # remove the portal path to reduce noise in virtual hostings
421
            portal_path = api.get_path(api.get_portal())
422
            value = value.replace(portal_path, "", 1)
423
        elif api.is_uid(value):
424
            value = _get_title_or_id_from_uid(value)
425
    # handle dictionaries
426
    elif isinstance(value, (dict)):
427
        value = json.dumps(sorted(value.items()), indent=1)
428
    # handle lists and tuples
429
    elif isinstance(value, (list, tuple)):
430
        value = sorted(map(_process_value, value))
431
        value = "; ".join(value)
432
    # handle unicodes
433
    if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
434
        value = api.safe_unicode(value).encode("utf8")
435
    return str(value)
436
437
438
def _get_title_or_id_from_uid(uid):
439
    """Returns the title or ID from the given UID
440
    """
441
    try:
442
        obj = api.get_object_by_uid(uid)
443
    except api.APIError:
444
        obj = None
445
    if not obj:
446
        return "<Deleted {}>".format(uid)
447
    title_or_id = api.get_title(obj) or api.get_id(obj)
448
    return title_or_id
449
450
451
def disable_snapshots(obj):
452
    """Disable and removes all snapshots from the given object
453
    """
454
    # do not take more snapshots
455
    alsoProvides(obj, IDoNotSupportSnapshots)
456
457
    # do not display audit log
458
    noLongerProvides(obj, IAuditable)
459
460
    # remove all snapshots
461
    annotation = IAnnotations(obj)
462
    storage = annotation.get(SNAPSHOT_STORAGE)
463
    if storage:
464
        del(annotation[SNAPSHOT_STORAGE])
465