Passed
Pull Request — 2.x (#1872)
by Ramon
05:35
created

senaite.core.catalog.indexer.auditlog   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 183
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 30
eloc 107
dl 0
loc 183
rs 10
c 0
b 0
f 0

14 Functions

Rating   Name   Duplication   Size   Complexity  
A _uid_to_title_cache_key() 0 6 2
A get_meta_value_for() 0 5 1
A snapshot_created() 0 7 1
D listing_searchable_text() 0 50 12
A modifiers() 0 6 1
A get_title_or_id_from_uid() 0 9 2
A get_created() 0 7 2
A get_fullname() 0 6 1
A get_actor() 0 7 2
A fullname() 0 6 1
A snapshot_version() 0 5 1
A get_action() 0 7 2
A actor() 0 6 1
A action() 0 6 1
1
# -*- coding: utf-8 -*-
2
3
import itertools
4
import re
5
6
import six
7
8
from bika.lims import api
9
from bika.lims.api.snapshot import get_last_snapshot
10
from bika.lims.api.snapshot import get_snapshot_count
11
from bika.lims.api.snapshot import get_snapshot_metadata
12
from bika.lims.api.snapshot import get_snapshots
13
from bika.lims.api.user import get_user_id
14
from bika.lims.interfaces import IAuditable
15
from plone.indexer import indexer
16
from plone.memoize.ram import DontCache
17
from plone.memoize.ram import cache
18
from senaite.core.interfaces import IAuditlogCatalog
19
20
UID_RX = re.compile(r"[a-z0-9]{32}$")
21
DATE_RX = re.compile(r"\d{4}[-/]\d{2}[-/]\d{2}")
22
23
24
def _uid_to_title_cache_key(func, uid):
25
    brain = api.get_brain_by_uid(uid, default=None)
26
    if brain is None:
27
        raise DontCache
28
    modified = api.get_modification_date(brain).millis()
29
    return "{}-{}".format(uid, modified)
30
31
32
@cache(_uid_to_title_cache_key)
33
def get_title_or_id_from_uid(uid):
34
    """Returns the title or ID from the given UID
35
    """
36
    obj = api.get_object_by_uid(uid, default=None)
37
    if obj is None:
38
        return ""
39
    title_or_id = api.get_title(obj) or api.get_id(obj)
40
    return title_or_id
41
42
43
def get_meta_value_for(snapshot, key, default=None):
44
    """Returns the metadata value for the given key
45
    """
46
    metadata = get_snapshot_metadata(snapshot)
47
    return metadata.get(key, default)
48
49
50
def get_actor(snapshot):
51
    """Get the actor of the snapshot
52
    """
53
    actor = get_meta_value_for(snapshot, "actor")
54
    if not actor:
55
        return get_user_id()
56
    return actor
57
58
59
def get_fullname(snapshot):
60
    """Get the actor's fullname of the snapshot
61
    """
62
    actor = get_actor(snapshot)
63
    properties = api.get_user_properties(actor)
64
    return properties.get("fullname", actor)
65
66
67
def get_action(snapshot):
68
    """Get the action of the snapshot
69
    """
70
    action = get_meta_value_for(snapshot, "action")
71
    if not action:
72
        return "Edit"
73
    return action
74
75
76
def get_created(snapshot):
77
    """Get the created date of the snapshot
78
    """
79
    created = get_meta_value_for(snapshot, "snapshot_created")
80
    if not created:
81
        return ""
82
    return created
83
84
85
@indexer(IAuditable)
86
def actor(instance):
87
    """Last modifiying user
88
    """
89
    last_snapshot = get_last_snapshot(instance)
90
    return get_actor(last_snapshot)
91
92
93
@indexer(IAuditable)
94
def fullname(instance):
95
    """Last modifiying user
96
    """
97
    last_snapshot = get_last_snapshot(instance)
98
    return get_fullname(last_snapshot)
99
100
101
@indexer(IAuditable)
102
def modifiers(instance):
103
    """Returns a list of all users that modified
104
    """
105
    snapshots = get_snapshots(instance)
106
    return map(get_actor, snapshots)
107
108
109
@indexer(IAuditable)
110
def action(instance):
111
    """Returns the last performed action
112
    """
113
    last_snapshot = get_last_snapshot(instance)
114
    return get_action(last_snapshot)
115
116
117
@indexer(IAuditable, IAuditlogCatalog)
118
def listing_searchable_text(instance):
119
    """Fulltext search for the audit metadata
120
    """
121
    # get all snapshots
122
    snapshots = get_snapshots(instance)
123
    # extract all snapshot values, because we are not interested in the
124
    # fieldnames (keys)
125
    values = map(lambda s: s.values(), snapshots)
126
    # prepare a set of unified catalog data
127
    catalog_data = set()
128
    # values to skip
129
    skip_values = ["None", "true", "True", "false", "False"]
130
    # internal uid -> title cache
131
    uid_title_cache = {}
132
133
    # helper function to recursively unpack the snapshot values
134
    def append(value):
135
        if isinstance(value, (list, tuple)):
136
            map(append, value)
137
        elif isinstance(value, (dict)):
138
            map(append, value.items())
139
        elif isinstance(value, six.string_types):
140
            # convert unicode to UTF8
141
            if isinstance(value, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable unicode does not seem to be defined.
Loading history...
142
                value = api.safe_unicode(value).encode("utf8")
143
            # skip single short values
144
            if len(value) < 2:
145
                return
146
            # flush non meaningful values
147
            if value in skip_values:
148
                return
149
            # flush ISO dates
150
            if re.match(DATE_RX, value):
151
                return
152
            # fetch the title
153
            if re.match(UID_RX, value):
154
                if value in uid_title_cache:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable uid_title_cache does not seem to be defined.
Loading history...
155
                    value = uid_title_cache[value]
156
                else:
157
                    title_or_id = get_title_or_id_from_uid(value)
158
                    uid_title_cache[value] = title_or_id
159
                    value = title_or_id
160
            catalog_data.add(value)
161
162
    # extract all meaningful values
163
    for value in itertools.chain(values):
164
        append(value)
165
166
    return " ".join(catalog_data)
167
168
169
@indexer(IAuditable)
170
def snapshot_created(instance):
171
    """Snapshot created date
172
    """
173
    last_snapshot = get_last_snapshot(instance)
174
    snapshot_created = get_created(last_snapshot)
175
    return api.to_date(snapshot_created)
176
177
178
@indexer(IAuditable)
179
def snapshot_version(instance):
180
    """Snapshot created date
181
    """
182
    return get_snapshot_count(instance)
183