Passed
Push — 2.x ( 8da1c4...fb0689 )
by Jordi
20:16 queued 13:33
created

VersionWrapper.get_versioned_data()   A

Complexity

Conditions 4

Size

Total Lines 26
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 26
rs 9.85
c 0
b 0
f 0
cc 4
nop 2
1
# -*- coding: utf-8 -*-
2
3
import inspect
4
from copy import deepcopy
5
6
from bika.lims import api
7
from bika.lims.api import snapshot as s_api
8
from bika.lims.utils import tmpID
9
from senaite.core.api import dtime
10
from senaite.core.interfaces import IVersionWrapper
11
from zope.interface import alsoProvides
12
from zope.interface import implementer
13
14
15
@implementer(IVersionWrapper)
16
class VersionWrapper(object):
17
    """A content wrapper that retrieves versioned attributes
18
    """
19
    def __init__(self, content):
20
        self.content = content
21
        self.fields = api.get_fields(content)
22
        self.clone = None
23
        self.version = 0
24
25
    def __repr__(self):
26
        return "<{}:{}({}@v{})>".format(
27
            self.__class__.__name__,
28
            api.get_portal_type(self.content),
29
            api.get_id(self.content),
30
            self.version)
31
32
    def __getattr__(self, name):
33
        """Dynamic lookups for attributes
34
        """
35
        # support for tab completion in PDB
36
        if name == "__members__":
37
            return [k for k, v in inspect.getmembers(self.content)]
38
39
        if name in self.content.__dict__:
40
            # try to lookup the value from the snapshot
41
            if name in self.snapshot:
42
                return self.snapshot.get(name)
43
44
        # load setters from the wrapped content directly
45
        if name.startswith("set"):
46
            attr = getattr(self.content, name, None)
47
        else:
48
            # load all other attributes from the clone
49
            attr = getattr(self.clone, name, None)
50
51
        if attr:
52
            return attr
53
54
        return super(VersionWrapper, self).__getattr__(name)
55
56
    def get_version(self):
57
        return self.version
58
59
    def get_clone(self):
60
        return self.clone
61
62
    def load_latest_version(self):
63
        """Load the latest version of the content
64
        """
65
        version = s_api.get_version(self.content)
66
        self.load_version(version)
67
68
    def make_metaclass(self, prefix="Clone"):
69
        """Returns a new metaclass
70
        """
71
        cls_base = self.content.__class__
72
        cls_name = cls_base.__name__
73
        cls_dict = {"__module__": cls_base.__module__}
74
        return type(prefix + cls_name, (cls_base, ), cls_dict)
75
76
    def load_version(self, version=0):
77
        """Load a snapshopt version
78
        """
79
        MetaClass = self.make_metaclass()
80
        clone = MetaClass(tmpID())
81
82
        # make acquisition chain lookups possible
83
        clone = clone.__of__(self.content.aq_parent)
84
85
        # apply the versioned data to the clone
86
        clone.__dict__ = self.get_versioned_data(version)
87
88
        # apply class interfaces manually
89
        class_ifaces = self.content.__class__.__implemented__.flattened()
90
        alsoProvides(clone, *class_ifaces)
91
92
        # remember the clone and loaded version
93
        self.clone = clone
94
        self.version = version
95
96
    def get_versioned_data(self, version):
97
        """Get the versioned data of the current content
98
99
        :param version: Version to fetch from the snapshot storage
100
        :returns: dictionary of versioned data
101
        """
102
        out = {}
103
104
        # get first a copy of the current content __data__
105
        data = deepcopy(self.content.__dict__)
106
107
        # fetch the snapshot of the object
108
        snapshot = s_api.get_snapshot_by_version(self.content, version)
109
        if not snapshot:
110
            raise KeyError("Version %s not found" % version)
111
112
        for key, value in data.items():
113
114
            # keep the original if we have no snapshot value
115
            if key not in snapshot:
116
                out[key] = value
117
            else:
118
                # assigned the processed snapshot value
119
                out[key] = self.process_snapshot_value(key, snapshot)
120
121
        return out
122
123
    def process_snapshot_value(self, key, snapshot):
124
        """Convert stringified snapshot values
125
126
        We try to match the required field type of the content object w/o using
127
        setters of the cloned object, as this might have side effects
128
        (reindexing, additional logic etc.).
129
130
        :param key: Processing key
131
        :param snapshot: The versioned snapshot
132
        :returns: Processed snapshot value
133
        """
134
        value = snapshot.get(key)
135
136
        # try to get the field
137
        field = self.fields.get(key)
138
        if not field:
139
            return value
140
141
        # directly convert empties, None and bool values
142
        if not value:
143
            return value
144
        elif value == "None":
145
            return None
146
        elif value in ["True", "False"]:
147
            return True if value == "True" else False
148
149
        # guess the required value type depending on the used field
150
        fieldclass = field.__class__.__name__.lower()
151
        fieldtype = getattr(field, "type", None)
152
153
        if fieldclass.startswith("date"):
154
            # convert date value
155
            return dtime.to_DT(value)
156
        elif fieldclass.startswith("integer"):
157
            return int(value)
158
        elif fieldclass.startswith("float"):
159
            return float(value)
160
        elif fieldclass == "fixedpointfield":
161
            # AT fixedpoint field
162
            return field._to_tuple(self.content, value)
163
        elif fieldclass == "durationfield":
164
            # AT duration field
165
            return {str(key): int(val) for key, val in value.items()}
166
        elif fieldclass == "emailsfield":
167
            # AT emails fields
168
            return value or ''
169
        elif fieldtype == "record":
170
            # AT record-like fields
171
            return value or {}
172
        return value
173
174
175
def VersionWrapperFactory(context):
176
    wrapper = VersionWrapper(context)
177
    wrapper.load_latest_version()
178
    return wrapper
179