Passed
Push — 2.x ( d001d1...dcd810 )
by Ramon
07:36
created

bika.lims.api.snapshot.get_snapshot_created()   A

Complexity

Conditions 2

Size

Total Lines 15
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 15
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
23
import six
24
from bika.lims import _
25
from bika.lims import api
26
from bika.lims import logger
27
from bika.lims.api.security import get_roles
28
from bika.lims.api.security import get_user_id
29
from bika.lims.interfaces import IAuditable
30
from bika.lims.interfaces import IDoNotSupportSnapshots
31
from DateTime import DateTime
32
from persistent.list import PersistentList
33
from plone.memoize.ram import cache
34
from senaite.app.supermodel import SuperModel
35
from senaite.core.api import dtime
36
from zope.annotation.interfaces import IAnnotatable
37
from zope.annotation.interfaces import IAnnotations
38
from zope.interface import alsoProvides
39
from zope.interface import noLongerProvides
40
41
SNAPSHOT_STORAGE = "senaite.core.snapshots"
42
43
44
def _objectdata_cache_key(func, obj):
45
    """Cache Key for object data
46
    """
47
    uid = api.get_uid(obj)
48
    modified = api.get_modification_date(obj).millis()
49
    review_state = api.get_review_status(obj)
50
    return "{}-{}-{}".format(uid, review_state, modified)
51
52
53
def supports_snapshots(obj):
54
    """Checks if the object supports snapshots
55
56
    Only objects which can hold an annotation storage can be auditable
57
58
    :param obj: Content object
59
    :returns: True/False
60
    """
61
    if IDoNotSupportSnapshots.providedBy(obj):
62
        return False
63
    return IAnnotatable.providedBy(obj)
64
65
66
def get_storage(obj):
67
    """Get or create the audit log storage for the given object
68
69
    :param obj: Content object
70
    :returns: PersistentList
71
    """
72
    annotation = IAnnotations(obj)
73
    if annotation.get(SNAPSHOT_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SNAPSHOT_STORAGE does not seem to be defined.
Loading history...
74
        annotation[SNAPSHOT_STORAGE] = PersistentList()
75
    return annotation[SNAPSHOT_STORAGE]
76
77
78
def get_snapshots(obj):
79
    """Get all snapshots from the storage
80
81
    :param obj: Content object
82
    :returns: List of snapshot dictionaries
83
    """
84
    snapshots = get_storage(obj)
85
    return map(json.loads, snapshots)
86
87
88
def has_snapshots(obj):
89
    """Checks if the object has snapshots
90
91
    :param obj: Content object
92
    :returns: True/False
93
    """
94
    storage = get_storage(obj)
95
    return len(storage) > 0
96
97
98
def get_snapshot_count(obj):
99
    """Returns the number of snapsots
100
101
    :param obj: Content object
102
    :returns: Current snapshots in the storage
103
    """
104
    try:
105
        annotation = IAnnotations(obj)
106
    except TypeError:
107
        return 0
108
    storage = annotation.get(SNAPSHOT_STORAGE, [])
109
    return len(storage)
110
111
112
def get_version(obj):
113
    """Returns the version of the object
114
115
    NOTE: Object versions start with 0!
116
117
    :param obj: Content object
118
    :returns: Current version of the object or -1
119
    """
120
    count = get_snapshot_count(obj)
121
    return count - 1
122
123
124
def get_snapshot_by_version(obj, version=0):
125
    """Get a snapshot by version
126
127
    Snapshot versions begin with `0`, because this is the first index of the
128
    storage, which is a list.
129
130
    :param obj: Content object
131
    :param version: The index position of the snapshot in the storage
132
    :returns: Snapshot at the given index position
133
    """
134
    if version < 0:
135
        return None
136
    snapshots = get_snapshots(obj)
137
    if version > len(snapshots) - 1:
138
        return None
139
    return snapshots[version]
140
141
142
def get_snapshot_version(obj, snapshot):
143
    """Returns the version of the given snapshot
144
145
    :param obj: Content object
146
    :param snapshot: Snapshot dictionary
147
    :returns: Index where the object is located
148
    """
149
    snapshots = get_snapshots(obj)
150
    if snapshot not in snapshots:
151
        return -1
152
    return snapshots.index(snapshot)
153
154
155
def get_last_snapshot(obj):
156
    """Get the last snapshot
157
158
    :param obj: Content object
159
    :returns: Last Snapshot or None
160
    """
161
    version = get_version(obj)
162
    return get_snapshot_by_version(obj, version)
163
164
165
def get_snapshot_created(snapshot):
166
    """Returns the snapshot creation date
167
168
    :param snapshot: Snapshot dictionary
169
    :returns: DateTime object of the snapshot creation date
170
    """
171
    metadata = get_snapshot_metadata(snapshot)
172
    created = metadata.get("snapshot_created")
173
174
    # prefer the timestamp if existing
175
    timestamp = metadata.get("timestamp")
176
    if timestamp:
177
        created = dtime.from_timestamp(timestamp)
178
179
    return dtime.to_DT(created)
180
181
182
def get_snapshot_metadata(snapshot):
183
    """Returns the snapshot metadata
184
185
    :param snapshot: Snapshot dictionary
186
    :returns: Metadata dictionary of the snapshot
187
    """
188
    return snapshot.get("__metadata__", {})
189
190
191
@cache(_objectdata_cache_key)
192
def get_object_data(obj):
193
    """Get object schema data
194
195
    NOTE: We RAM cache this data because it should only change when the object
196
    was modified!
197
198
    XXX: We need to set at least the modification date when we set fields in
199
    Ajax Listing when we take a snapshot there!
200
201
    :param obj: Content object
202
    :returns: Dictionary of extracted schema data
203
    """
204
205
    try:
206
        model = SuperModel(obj)
207
        data = model.to_dict()
208
    except Exception as exc:
209
        logger.error("Failed to get schema data for {}: {}"
210
                     .format(repr(obj), str(exc)))
211
        data = {}
212
213
    return data
214
215
216
def get_request_data(request=None):
217
    """Get request header/form data
218
219
    A typical request behind NGINX looks like this:
220
221
    {
222
        'CONNECTION_TYPE': 'close',
223
        'CONTENT_LENGTH': '52',
224
        'CONTENT_TYPE': 'application/x-www-form-urlencoded; charset=UTF-8',
225
        'GATEWAY_INTERFACE': 'CGI/1.1',
226
        'HTTP_ACCEPT': 'application/json, text/javascript, */*; q=0.01',
227
        'HTTP_ACCEPT_ENCODING': 'gzip, deflate, br',
228
        'HTTP_ACCEPT_LANGUAGE': 'de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
229
        'HTTP_COOKIE': '_ga=GA1.2.1058345096.1522506452; ...',
230
        'HTTP_HOST': 'senaite.ridingbytes.com',
231
        'HTTP_ORIGIN': 'https://senaite.ridingbytes.com',
232
        'HTTP_REFERER': 'https://senaite.ridingbytes.com/clients/client-1/H2O-0054',
233
        'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36',
234
        'HTTP_X_FORWARDED_FOR': '93.238.47.95',
235
        'HTTP_X_REAL_IP': '93.238.47.95',
236
        'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
237
        'PATH_INFO': '/VirtualHostBase/https/senaite.ridingbytes.com/senaite/VirtualHostRoot//@@API/update',
238
        'PATH_TRANSLATED': '/VirtualHostBase/https/senaite.ridingbytes.com/senaite/VirtualHostRoot/@@API/update',
239
        'QUERY_STRING': '',
240
        'REMOTE_ADDR': '127.0.0.1',
241
        'REQUEST_METHOD': 'POST',
242
        'SCRIPT_NAME': '',
243
        'SERVER_NAME': 'localhost',
244
        'SERVER_PORT': '8081',
245
        'SERVER_PROTOCOL': 'HTTP/1.0',
246
        'SERVER_SOFTWARE': 'Zope/(2.13.28, python 2.7.12, linux2) ZServer/1.1',
247
        'channel.creation_time': 1556086048
248
    }
249
250
    :param request: Request object
251
    :returns: Dictionary of extracted request header/form data
252
    """
253
254
    if request is None:
255
        # get the request
256
        request = api.get_request()
257
258
    # Happens in the test runner
259
    if not request:
260
        return {}
261
262
    # Try to obtain the real IP address of the client
263
    forwarded_for = request.get_header("X_FORWARDED_FOR")
264
    real_ip = request.get_header("X_REAL_IP")
265
    remote_address = request.get_header("REMOTE_ADDR")
266
267
    return {
268
        "comments": request.form.get("comments", ""),
269
        "remote_address": forwarded_for or real_ip or remote_address,
270
        "user_agent": request.get_header("HTTP_USER_AGENT"),
271
        "referer": request.get_header("HTTP_REFERER"),
272
    }
273
274
275
def get_object_metadata(obj, **kw):
276
    """Get object metadata
277
278
    :param obj: Content object
279
    :returns: Dictionary of extracted object metadata
280
    """
281
    created = DateTime()
282
283
    # inject metadata of volatile data
284
    metadata = {
285
        "actor": get_user_id(),
286
        "roles": get_roles(),
287
        "action": "",
288
        "review_state": api.get_review_status(obj),
289
        "active": api.is_active(obj),
290
        "snapshot_created": created.ISO(),
291
        "timestamp": dtime.to_timestamp(created),
292
        "modified": api.get_modification_date(obj).ISO(),
293
        "remote_address": "",
294
        "user_agent": "",
295
        "referer": "",
296
        "comments": "",
297
    }
298
299
    # Update request data
300
    metadata.update(get_request_data())
301
302
    # allow metadata overrides
303
    metadata.update(kw)
304
305
    return metadata
306
307
308
def take_snapshot(obj, store=True, **kw):
309
    """Takes a snapshot of the passed in object
310
311
    :param obj: Content object
312
    :returns: New snapshot
313
    """
314
    logger.debug("📷 Take new snapshot for {}".format(repr(obj)))
315
316
    # get the object data
317
    snapshot = get_object_data(obj)
318
319
    # get the metadata
320
    metadata = get_object_metadata(obj, **kw)
321
322
    # store the metadata
323
    snapshot["__metadata__"] = metadata
324
325
    # convert the snapshot to JSON
326
    data = json.dumps(snapshot)
327
328
    # return immediately
329
    if not store:
330
        return snapshot
331
332
    # get the snapshot storage
333
    storage = get_storage(obj)
334
335
    # store the snapshot data
336
    storage.append(data)
337
338
    # Mark the content as auditable
339
    alsoProvides(obj, IAuditable)
340
341
    return snapshot
342
343
344
def pause_snapshots_for(obj):
345
    """Pause snapshots for the given object
346
    """
347
    alsoProvides(obj, IDoNotSupportSnapshots)
348
349
350
def resume_snapshots_for(obj):
351
    """Resume snapshots for the given object
352
    """
353
    try:
354
        noLongerProvides(obj, IDoNotSupportSnapshots)
355
    except ValueError:
356
        # Handle ValueError: Can only remove directly provided interfaces.
357
        # when the interface was directly provided on class level
358
        pass
359
360
361
def compare_snapshots(snapshot_a, snapshot_b, raw=False):
362
    """Returns a diff of two given snapshots (dictionaries)
363
364
    :param snapshot_a: First snapshot
365
    :param snapshot_b: Second snapshot
366
    :param raw: True to compare the raw values, e.g. UIDs
367
    :returns: Dictionary of field/value pairs that differ
368
    """
369
    if not all(map(lambda x: isinstance(x, dict),
370
                   [snapshot_a, snapshot_b])):
371
        return {}
372
373
    diffs = {}
374
    for key_a, value_a in six.iteritems(snapshot_a):
375
        # skip fieds starting with _ or __
376
        if key_a.startswith("_"):
377
            continue
378
        # get the value of the second snapshot
379
        value_b = snapshot_b.get(key_a)
380
        # get the diff between the two values
381
        diff = diff_values(value_a, value_b, raw=raw)
382
        if diff is not None:
383
            diffs[key_a] = diff
384
    return diffs
385
386
387
def compare_last_two_snapshots(obj, raw=False):
388
    """Helper to compare the last two snapshots directly
389
    """
390
391
    if get_snapshot_count(obj) < 2:
392
        return {}
393
394
    version = get_version(obj)
395
396
    snap1 = get_snapshot_by_version(obj, version - 1)
397
    snap2 = get_snapshot_by_version(obj, version)
398
399
    return compare_snapshots(snap1, snap2, raw=raw)
400
401
402
def diff_values(value_a, value_b, raw=False):
403
    """Returns a human-readable diff between two values
404
405
    TODO: Provide an adapter per content type for this task to enable a more
406
          specific diff between the values
407
408
    :param value_a: First value to compare
409
    :param value_b: Second value to compare
410
    :param raw: True to compare the raw values, e.g. UIDs
411
    :returns a list of diff tuples
412
    """
413
414
    if not raw:
415
        value_a = _process_value(value_a)
416
        value_b = _process_value(value_b)
417
418
    # No changes
419
    if value_a == value_b:
420
        return None
421
422
    diffs = []
423
    # N.B.: the choice for the tuple data structure is to enable in the future
424
    # more granular diffs, e.g. the changed values within a dictionary etc.
425
    diffs.append((value_a, value_b))
426
    return diffs
427
428
429
def _process_value(value):
430
    """Convert the value into a human readable diff string
431
    """
432
    if not value:
433
        value = _("Not set")
434
    # handle strings
435
    elif isinstance(value, six.string_types):
436
        # XXX: bad data, e.g. in AS Method field
437
        if value == "None":
438
            value = _("Not set")
439
        # 0 is detected as the portal UID
440
        elif value == "0":
441
            value = "0"
442
        # handle physical paths
443
        elif value.startswith("/"):
444
            # remove the portal path to reduce noise in virtual hostings
445
            portal_path = api.get_path(api.get_portal())
446
            value = value.replace(portal_path, "", 1)
447
        elif api.is_uid(value):
448
            value = _get_title_or_id_from_uid(value)
449
    # handle dictionaries
450
    elif isinstance(value, (dict)):
451
        value = json.dumps(sorted(value.items()), indent=1)
452
    # handle lists and tuples
453
    elif isinstance(value, (list, tuple)):
454
        value = sorted(map(_process_value, value))
455
        value = "; ".join(value)
456
    # handle unicodes
457
    if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
458
        value = api.safe_unicode(value).encode("utf8")
459
    return str(value)
460
461
462
def _get_title_or_id_from_uid(uid):
463
    """Returns the title or ID from the given UID
464
    """
465
    try:
466
        obj = api.get_object_by_uid(uid)
467
    except api.APIError:
468
        obj = None
469
    if not obj:
470
        return "<Deleted {}>".format(uid)
471
    title_or_id = api.get_title(obj) or api.get_id(obj)
472
    return title_or_id
473
474
475
def disable_snapshots(obj):
476
    """Disable and removes all snapshots from the given object
477
    """
478
    # do not take more snapshots
479
    alsoProvides(obj, IDoNotSupportSnapshots)
480
481
    # do not display audit log
482
    noLongerProvides(obj, IAuditable)
483
484
    # remove all snapshots
485
    annotation = IAnnotations(obj)
486
    storage = annotation.get(SNAPSHOT_STORAGE)
487
    if storage:
488
        del(annotation[SNAPSHOT_STORAGE])
489