Passed
Push — master ( 804f45...1fa2c0 )
by Jordi
04:08
created

build.bika.lims.api.snapshot.compare_snapshots()   B

Complexity

Conditions 6

Size

Total Lines 24
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 24
rs 8.6666
c 0
b 0
f 0
cc 6
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import json
22
23
from bika.lims import _
24
from bika.lims import api
25
from bika.lims import logger
26
from bika.lims.api.security import get_roles
27
from bika.lims.api.security import get_user_id
28
from bika.lims.interfaces import IAuditable
29
from DateTime import DateTime
30
from persistent.list import PersistentList
31
from plone.memoize.ram import cache
32
from senaite.core.supermodel import SuperModel
33
from zope.annotation.interfaces import IAnnotatable
34
from zope.annotation.interfaces import IAnnotations
35
from zope.interface import alsoProvides
36
37
SNAPSHOT_STORAGE = "senaite.core.snapshots"
38
39
40
def _objectdata_cache_key(func, obj):
41
    """Cache Key for object data
42
    """
43
    uid = api.get_uid(obj)
44
    modified = api.get_modification_date(obj).millis()
45
    review_state = api.get_review_status(obj)
46
    return "{}-{}-{}".format(uid, review_state, modified)
47
48
49
def supports_snapshots(obj):
50
    """Checks if the object supports snapshots
51
52
    Only objects which can hold an annotation storage can be auditable
53
54
    :param obj: Content object
55
    :returns: True/False
56
    """
57
    return IAnnotatable.providedBy(obj)
58
59
60
def get_storage(obj):
61
    """Get or create the audit log storage for the given object
62
63
    :param obj: Content object
64
    :returns: PersistentList
65
    """
66
    annotation = IAnnotations(obj)
67
    if annotation.get(SNAPSHOT_STORAGE) is None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SNAPSHOT_STORAGE does not seem to be defined.
Loading history...
68
        annotation[SNAPSHOT_STORAGE] = PersistentList()
69
    return annotation[SNAPSHOT_STORAGE]
70
71
72
def get_snapshots(obj):
73
    """Get all snapshots from the storage
74
75
    :param obj: Content object
76
    :returns: List of snapshot dictionaries
77
    """
78
    snapshots = get_storage(obj)
79
    return map(json.loads, snapshots)
80
81
82
def has_snapshots(obj):
83
    """Checks if the object has snapshots
84
85
    :param obj: Content object
86
    :returns: True/False
87
    """
88
    storage = get_storage(obj)
89
    return len(storage) > 0
90
91
92
def get_snapshot_count(obj):
93
    """Returns the number of snapsots
94
95
    :param obj: Content object
96
    :returns: Current snapshots in the storage
97
    """
98
    try:
99
        annotation = IAnnotations(obj)
100
    except TypeError:
101
        return 0
102
    storage = annotation.get(SNAPSHOT_STORAGE, [])
103
    return len(storage)
104
105
106
def get_version(obj):
107
    """Returns the version of the object
108
109
    NOTE: Object versions start with 0!
110
111
    :param obj: Content object
112
    :returns: Current version of the object or -1
113
    """
114
    count = get_snapshot_count(obj)
115
    if count == 0:
116
        return -1
117
    return count - 1
118
119
120
def get_snapshot_by_version(obj, version=0):
121
    """Get a snapshot by version
122
123
    Snapshot versions begin with `0`, because this is the first index of the
124
    storage, which is a list.
125
126
    :param obj: Content object
127
    :param version: The index position of the snapshot in the storage
128
    :returns: Snapshot at the given index position
129
    """
130
    if version < 0:
131
        return None
132
    snapshots = get_snapshots(obj)
133
    if version > len(snapshots) - 1:
134
        return None
135
    return snapshots[version]
136
137
138
def get_snapshot_version(obj, snapshot):
139
    """Returns the version of the given snapshot
140
141
    :param obj: Content object
142
    :param snapshot: Snapshot dictionary
143
    :returns: Index where the object is lcated
144
    """
145
    snapshots = get_snapshots(obj)
146
    return snapshots.index(snapshot)
147
148
149
def get_last_snapshot(obj):
150
    """Get the last snapshot
151
152
    :param obj: Content object
153
    :returns: Last Snapshot or None
154
    """
155
    version = get_version(obj)
156
    return get_snapshot_by_version(obj, version)
157
158
159
def get_snapshot_metadata(snapshot):
160
    """Returns the snapshot metadata
161
162
    :param snapshot: Snapshot dictionary
163
    :returns: Metadata dictionary of the snapshot
164
    """
165
    return snapshot.get("__metadata__", {})
166
167
168
@cache(_objectdata_cache_key)
169
def get_object_data(obj):
170
    """Get object schema data
171
172
    NOTE: We RAM cache this data because it should only change when the object
173
    was modified!
174
175
    XXX: We need to set at least the modification date when we set fields in
176
    Ajax Listing when we take a snapshot there!
177
178
    :param obj: Content object
179
    :returns: Dictionary of extracted schema data
180
    """
181
182
    model = SuperModel(obj)
183
    try:
184
        data = model.to_dict()
185
    except Exception as exc:
186
        logger.error("Failed to get schema data for {}: {}"
187
                     .format(repr(obj), str(exc)))
188
        data = {}
189
190
    return data
191
192
193
def get_request_data(request=None):
194
    """Get request header/form data
195
196
    :param request: Request object
197
    :returns: Dictionary of extracted request header/form data
198
    """
199
200
    if request is None:
201
        # get the request
202
        request = api.get_request()
203
204
    # Happens in the test runner
205
    if not request:
206
        return {}
207
208
    return {
209
        "comments": request.form.get("comments", ""),
210
        "remote_address": request.get_header("REMOTE_ADDR"),
211
        "user_agent": request.get_header("HTTP_USER_AGENT"),
212
        "referer": request.get_header("HTTP_REFERER"),
213
    }
214
215
216
def get_object_metadata(obj, **kw):
217
    """Get object metadata
218
219
    :param obj: Content object
220
    :returns: Dictionary of extracted object metadata
221
    """
222
223
    # inject metadata of volatile data
224
    metadata = {
225
        "actor": get_user_id(),
226
        "roles": get_roles(),
227
        "action": "",
228
        "review_state": api.get_review_status(obj),
229
        "active": api.is_active(obj),
230
        "snapshot_created": DateTime().ISO(),
231
        "modified": api.get_modification_date(obj).ISO(),
232
        "remote_address": "",
233
        "user_agent": "",
234
        "referer": "",
235
        "comments": "",
236
    }
237
238
    # Update request data
239
    metadata.update(get_request_data())
240
241
    # allow metadata overrides
242
    metadata.update(kw)
243
244
    return metadata
245
246
247
def take_snapshot(obj, store=True, **kw):
248
    """Takes a snapshot of the passed in object
249
250
    :param obj: Content object
251
    :returns: New snapshot
252
    """
253
    logger.debug("📷 Take new snapshot for {}".format(repr(obj)))
254
255
    # get the object data
256
    snapshot = get_object_data(obj)
257
258
    # get the metadata
259
    metadata = get_object_metadata(obj, **kw)
260
261
    # store the metadata
262
    snapshot["__metadata__"] = metadata
263
264
    # convert the snapshot to JSON
265
    data = json.dumps(snapshot)
266
267
    # return immediately
268
    if not store:
269
        return snapshot
270
271
    # get the snapshot storage
272
    storage = get_storage(obj)
273
274
    # store the snapshot data
275
    storage.append(data)
276
277
    # Mark the content as auditable
278
    alsoProvides(obj, IAuditable)
279
280
    return snapshot
281
282
283
def compare_snapshots(snapshot_a, snapshot_b, raw=False):
284
    """Returns a diff of two given snapshots (dictionaries)
285
286
    :param snapshot_a: First snapshot
287
    :param snapshot_b: Second snapshot
288
    :param raw: True to compare the raw values, e.g. UIDs
289
    :returns: Dictionary of field/value pairs that differ
290
    """
291
    if not all(map(lambda x: isinstance(x, dict),
292
                   [snapshot_a, snapshot_b])):
293
        return {}
294
295
    diffs = {}
296
    for key_a, value_a in snapshot_a.iteritems():
297
        # skip fieds starting with _ or __
298
        if key_a.startswith("_"):
299
            continue
300
        # get the value of the second snapshot
301
        value_b = snapshot_b.get(key_a)
302
        # get the diff between the two values
303
        diff = diff_values(value_a, value_b, raw=raw)
304
        if diff is not None:
305
            diffs[key_a] = diff
306
    return diffs
307
308
309
def compare_last_two_snapshots(obj, raw=False):
310
    """Helper to compare the last two snapshots directly
311
    """
312
313
    if get_snapshot_count(obj) < 2:
314
        return {}
315
316
    version = get_version(obj)
317
318
    snap1 = get_snapshot_by_version(obj, version - 1)
319
    snap2 = get_snapshot_by_version(obj, version)
320
321
    return compare_snapshots(snap1, snap2, raw=raw)
322
323
324
def diff_values(value_a, value_b, raw=False):
325
    """Returns a human-readable diff between two values
326
327
    :param value_a: First value to compare
328
    :param value_b: Second value to compare
329
    :param raw: True to compare the raw values, e.g. UIDs
330
    :returns a list of diff tuples
331
    """
332
333
    if not raw:
334
        value_a = _process_value(value_a)
335
        value_b = _process_value(value_b)
336
337
    # No changes
338
    if value_a == value_b:
339
        return None
340
341
    diffs = []
342
    # N.B.: the choice for the tuple data structure is to enable in the future
343
    # more granular diffs, e.g. the changed values within a dictionary etc.
344
    diffs.append((value_a, value_b))
345
    return diffs
346
347
348
def _process_value(value):
349
    """Convert the value into a human readable diff string
350
    """
351
    if not value:
352
        value = _("Not set")
353
    # XXX: bad data, e.g. in AS Method field
354
    elif value == "None":
355
        value = _("Not set")
356
    # 0 is detected as the portal UID
357
    elif value == "0":
358
        pass
359
    elif api.is_uid(value):
360
        value = _get_title_or_id_from_uid(value)
361
    elif isinstance(value, (dict)):
362
        value = json.dumps(sorted(value.items()), indent=1)
363
    elif isinstance(value, (list, tuple)):
364
        value = sorted(map(_process_value, value))
365
        value = "; ".join(value)
366
    elif isinstance(value, unicode):
367
        value = api.safe_unicode(value).encode("utf8")
368
    return str(value)
369
370
371
def _get_title_or_id_from_uid(uid):
372
    """Returns the title or ID from the given UID
373
    """
374
    try:
375
        obj = api.get_object_by_uid(uid)
376
    except api.APIError:
377
        return "<Deleted {}>".format(uid)
378
    title_or_id = api.get_title(obj) or api.get_id(obj)
379
    return title_or_id
380