Passed
Push — master ( f38d56...9d71bf )
by Jordi
05:50
created

build.bika.lims.api.snapshot.compare_snapshots()   B

Complexity

Conditions 6

Size

Total Lines 24
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 24
rs 8.6666
c 0
b 0
f 0
cc 6
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
23
from bika.lims import _
24
from bika.lims import api
25
from bika.lims import logger
26
from bika.lims.api.security import get_roles
27
from bika.lims.api.security import get_user_id
28
from bika.lims.interfaces import IAuditable
29
from bika.lims.interfaces import IDoNotSupportSnapshots
30
from DateTime import DateTime
31
from persistent.list import PersistentList
32
from plone.memoize.ram import cache
33
from senaite.core.supermodel import SuperModel
34
from zope.annotation.interfaces import IAnnotatable
35
from zope.annotation.interfaces import IAnnotations
36
from zope.interface import alsoProvides
37
38
SNAPSHOT_STORAGE = "senaite.core.snapshots"
39
40
41
def _objectdata_cache_key(func, obj):
42
    """Cache Key for object data
43
    """
44
    uid = api.get_uid(obj)
45
    modified = api.get_modification_date(obj).millis()
46
    review_state = api.get_review_status(obj)
47
    return "{}-{}-{}".format(uid, review_state, modified)
48
49
50
def supports_snapshots(obj):
51
    """Checks if the object supports snapshots
52
53
    Only objects which can hold an annotation storage can be auditable
54
55
    :param obj: Content object
56
    :returns: True/False
57
    """
58
    if IDoNotSupportSnapshots.providedBy(obj):
59
        return False
60
    return IAnnotatable.providedBy(obj)
61
62
63
def get_storage(obj):
64
    """Get or create the audit log storage for the given object
65
66
    :param obj: Content object
67
    :returns: PersistentList
68
    """
69
    annotation = IAnnotations(obj)
70
    if annotation.get(SNAPSHOT_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SNAPSHOT_STORAGE does not seem to be defined.
Loading history...
71
        annotation[SNAPSHOT_STORAGE] = PersistentList()
72
    return annotation[SNAPSHOT_STORAGE]
73
74
75
def get_snapshots(obj):
76
    """Get all snapshots from the storage
77
78
    :param obj: Content object
79
    :returns: List of snapshot dictionaries
80
    """
81
    snapshots = get_storage(obj)
82
    return map(json.loads, snapshots)
83
84
85
def has_snapshots(obj):
86
    """Checks if the object has snapshots
87
88
    :param obj: Content object
89
    :returns: True/False
90
    """
91
    storage = get_storage(obj)
92
    return len(storage) > 0
93
94
95
def get_snapshot_count(obj):
96
    """Returns the number of snapsots
97
98
    :param obj: Content object
99
    :returns: Current snapshots in the storage
100
    """
101
    try:
102
        annotation = IAnnotations(obj)
103
    except TypeError:
104
        return 0
105
    storage = annotation.get(SNAPSHOT_STORAGE, [])
106
    return len(storage)
107
108
109
def get_version(obj):
110
    """Returns the version of the object
111
112
    NOTE: Object versions start with 0!
113
114
    :param obj: Content object
115
    :returns: Current version of the object or -1
116
    """
117
    count = get_snapshot_count(obj)
118
    if count == 0:
119
        return -1
120
    return count - 1
121
122
123
def get_snapshot_by_version(obj, version=0):
124
    """Get a snapshot by version
125
126
    Snapshot versions begin with `0`, because this is the first index of the
127
    storage, which is a list.
128
129
    :param obj: Content object
130
    :param version: The index position of the snapshot in the storage
131
    :returns: Snapshot at the given index position
132
    """
133
    if version < 0:
134
        return None
135
    snapshots = get_snapshots(obj)
136
    if version > len(snapshots) - 1:
137
        return None
138
    return snapshots[version]
139
140
141
def get_snapshot_version(obj, snapshot):
142
    """Returns the version of the given snapshot
143
144
    :param obj: Content object
145
    :param snapshot: Snapshot dictionary
146
    :returns: Index where the object is lcated
147
    """
148
    snapshots = get_snapshots(obj)
149
    return snapshots.index(snapshot)
150
151
152
def get_last_snapshot(obj):
153
    """Get the last snapshot
154
155
    :param obj: Content object
156
    :returns: Last Snapshot or None
157
    """
158
    version = get_version(obj)
159
    return get_snapshot_by_version(obj, version)
160
161
162
def get_snapshot_metadata(snapshot):
163
    """Returns the snapshot metadata
164
165
    :param snapshot: Snapshot dictionary
166
    :returns: Metadata dictionary of the snapshot
167
    """
168
    return snapshot.get("__metadata__", {})
169
170
171
@cache(_objectdata_cache_key)
172
def get_object_data(obj):
173
    """Get object schema data
174
175
    NOTE: We RAM cache this data because it should only change when the object
176
    was modified!
177
178
    XXX: We need to set at least the modification date when we set fields in
179
    Ajax Listing when we take a snapshot there!
180
181
    :param obj: Content object
182
    :returns: Dictionary of extracted schema data
183
    """
184
185
    model = SuperModel(obj)
186
    try:
187
        data = model.to_dict()
188
    except Exception as exc:
189
        logger.error("Failed to get schema data for {}: {}"
190
                     .format(repr(obj), str(exc)))
191
        data = {}
192
193
    return data
194
195
196
def get_request_data(request=None):
197
    """Get request header/form data
198
199
    A typical request behind NGINX looks like this:
200
201
    {
202
        'CONNECTION_TYPE': 'close',
203
        'CONTENT_LENGTH': '52',
204
        'CONTENT_TYPE': 'application/x-www-form-urlencoded; charset=UTF-8',
205
        'GATEWAY_INTERFACE': 'CGI/1.1',
206
        'HTTP_ACCEPT': 'application/json, text/javascript, */*; q=0.01',
207
        'HTTP_ACCEPT_ENCODING': 'gzip, deflate, br',
208
        'HTTP_ACCEPT_LANGUAGE': 'de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7',
209
        'HTTP_COOKIE': '_ga=GA1.2.1058345096.1522506452; ...',
210
        'HTTP_HOST': 'senaite.ridingbytes.com',
211
        'HTTP_ORIGIN': 'https://senaite.ridingbytes.com',
212
        'HTTP_REFERER': 'https://senaite.ridingbytes.com/clients/client-1/H2O-0054',
213
        'HTTP_USER_AGENT': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36',
214
        'HTTP_X_FORWARDED_FOR': '93.238.47.95',
215
        'HTTP_X_REAL_IP': '93.238.47.95',
216
        'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
217
        'PATH_INFO': '/VirtualHostBase/https/senaite.ridingbytes.com/senaite/VirtualHostRoot//@@API/update',
218
        'PATH_TRANSLATED': '/VirtualHostBase/https/senaite.ridingbytes.com/senaite/VirtualHostRoot/@@API/update',
219
        'QUERY_STRING': '',
220
        'REMOTE_ADDR': '127.0.0.1',
221
        'REQUEST_METHOD': 'POST',
222
        'SCRIPT_NAME': '',
223
        'SERVER_NAME': 'localhost',
224
        'SERVER_PORT': '8081',
225
        'SERVER_PROTOCOL': 'HTTP/1.0',
226
        'SERVER_SOFTWARE': 'Zope/(2.13.28, python 2.7.12, linux2) ZServer/1.1',
227
        'channel.creation_time': 1556086048
228
    }
229
230
    :param request: Request object
231
    :returns: Dictionary of extracted request header/form data
232
    """ # noqa
233
234
    if request is None:
235
        # get the request
236
        request = api.get_request()
237
238
    # Happens in the test runner
239
    if not request:
240
        return {}
241
242
    # Try to obtain the real IP address of the client
243
    forwarded_for = request.get_header("X_FORWARDED_FOR")
244
    real_ip = request.get_header("X_REAL_IP")
245
    remote_address = request.get_header("REMOTE_ADDR")
246
247
    return {
248
        "comments": request.form.get("comments", ""),
249
        "remote_address": forwarded_for or real_ip or remote_address,
250
        "user_agent": request.get_header("HTTP_USER_AGENT"),
251
        "referer": request.get_header("HTTP_REFERER"),
252
    }
253
254
255
def get_object_metadata(obj, **kw):
256
    """Get object metadata
257
258
    :param obj: Content object
259
    :returns: Dictionary of extracted object metadata
260
    """
261
262
    # inject metadata of volatile data
263
    metadata = {
264
        "actor": get_user_id(),
265
        "roles": get_roles(),
266
        "action": "",
267
        "review_state": api.get_review_status(obj),
268
        "active": api.is_active(obj),
269
        "snapshot_created": DateTime().ISO(),
270
        "modified": api.get_modification_date(obj).ISO(),
271
        "remote_address": "",
272
        "user_agent": "",
273
        "referer": "",
274
        "comments": "",
275
    }
276
277
    # Update request data
278
    metadata.update(get_request_data())
279
280
    # allow metadata overrides
281
    metadata.update(kw)
282
283
    return metadata
284
285
286
def take_snapshot(obj, store=True, **kw):
287
    """Takes a snapshot of the passed in object
288
289
    :param obj: Content object
290
    :returns: New snapshot
291
    """
292
    logger.debug("📷 Take new snapshot for {}".format(repr(obj)))
293
294
    # get the object data
295
    snapshot = get_object_data(obj)
296
297
    # get the metadata
298
    metadata = get_object_metadata(obj, **kw)
299
300
    # store the metadata
301
    snapshot["__metadata__"] = metadata
302
303
    # convert the snapshot to JSON
304
    data = json.dumps(snapshot)
305
306
    # return immediately
307
    if not store:
308
        return snapshot
309
310
    # get the snapshot storage
311
    storage = get_storage(obj)
312
313
    # store the snapshot data
314
    storage.append(data)
315
316
    # Mark the content as auditable
317
    alsoProvides(obj, IAuditable)
318
319
    return snapshot
320
321
322
def compare_snapshots(snapshot_a, snapshot_b, raw=False):
323
    """Returns a diff of two given snapshots (dictionaries)
324
325
    :param snapshot_a: First snapshot
326
    :param snapshot_b: Second snapshot
327
    :param raw: True to compare the raw values, e.g. UIDs
328
    :returns: Dictionary of field/value pairs that differ
329
    """
330
    if not all(map(lambda x: isinstance(x, dict),
331
                   [snapshot_a, snapshot_b])):
332
        return {}
333
334
    diffs = {}
335
    for key_a, value_a in snapshot_a.iteritems():
336
        # skip fieds starting with _ or __
337
        if key_a.startswith("_"):
338
            continue
339
        # get the value of the second snapshot
340
        value_b = snapshot_b.get(key_a)
341
        # get the diff between the two values
342
        diff = diff_values(value_a, value_b, raw=raw)
343
        if diff is not None:
344
            diffs[key_a] = diff
345
    return diffs
346
347
348
def compare_last_two_snapshots(obj, raw=False):
349
    """Helper to compare the last two snapshots directly
350
    """
351
352
    if get_snapshot_count(obj) < 2:
353
        return {}
354
355
    version = get_version(obj)
356
357
    snap1 = get_snapshot_by_version(obj, version - 1)
358
    snap2 = get_snapshot_by_version(obj, version)
359
360
    return compare_snapshots(snap1, snap2, raw=raw)
361
362
363
def diff_values(value_a, value_b, raw=False):
364
    """Returns a human-readable diff between two values
365
366
    TODO: Provide an adapter per content type for this task to enable a more
367
          specific diff between the values
368
369
    :param value_a: First value to compare
370
    :param value_b: Second value to compare
371
    :param raw: True to compare the raw values, e.g. UIDs
372
    :returns a list of diff tuples
373
    """
374
375
    if not raw:
376
        value_a = _process_value(value_a)
377
        value_b = _process_value(value_b)
378
379
    # No changes
380
    if value_a == value_b:
381
        return None
382
383
    diffs = []
384
    # N.B.: the choice for the tuple data structure is to enable in the future
385
    # more granular diffs, e.g. the changed values within a dictionary etc.
386
    diffs.append((value_a, value_b))
387
    return diffs
388
389
390
def _process_value(value):
391
    """Convert the value into a human readable diff string
392
    """
393
    if not value:
394
        value = _("Not set")
395
    # handle strings
396
    elif isinstance(value, basestring):
397
        # XXX: bad data, e.g. in AS Method field
398
        if value == "None":
399
            value = _("Not set")
400
        # 0 is detected as the portal UID
401
        elif value == "0":
402
            value = "0"
403
        # handle physical paths
404
        elif value.startswith("/"):
405
            # remove the portal path to reduce noise in virtual hostings
406
            portal_path = api.get_path(api.get_portal())
407
            value = value.replace(portal_path, "", 1)
408
        elif api.is_uid(value):
409
            value = _get_title_or_id_from_uid(value)
410
    # handle dictionaries
411
    elif isinstance(value, (dict)):
412
        value = json.dumps(sorted(value.items()), indent=1)
413
    # handle lists and tuples
414
    elif isinstance(value, (list, tuple)):
415
        value = sorted(map(_process_value, value))
416
        value = "; ".join(value)
417
    # handle unicodes
418
    if isinstance(value, unicode):
419
        value = api.safe_unicode(value).encode("utf8")
420
    return str(value)
421
422
423
def _get_title_or_id_from_uid(uid):
424
    """Returns the title or ID from the given UID
425
    """
426
    try:
427
        obj = api.get_object_by_uid(uid)
428
    except api.APIError:
429
        return "<Deleted {}>".format(uid)
430
    title_or_id = api.get_title(obj) or api.get_id(obj)
431
    return title_or_id
432