Passed
Push — 2.x ( 488b85...c0712a )
by Jordi
06:35
created

senaite.core.astm.importer.ASTMImporter.sample()   A

Complexity

Conditions 4

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 23
rs 9.65
c 0
b 0
f 0
cc 4
nop 1
1
# -*- coding: utf-8 -*-
2
3
from datetime import datetime
4
5
from bika.lims import api
6
from senaite.core.catalog import SAMPLE_CATALOG
7
from senaite.core.catalog import SETUP_CATALOG
8
from senaite.core.interfaces import IASTMImporter
9
from senaite.core.interfaces import ISenaiteCore
10
from six import StringIO
11
from zope.component import adapter
12
from zope.interface import Interface
13
from zope.interface import implementer
14
15
16
@adapter(Interface, Interface, ISenaiteCore)
17
@implementer(IASTMImporter)
18
class ASTMImporter(object):
19
    """Adapter imports JSON ASTM messages
20
    """
21
    def __init__(self, data, message, request):
22
        self.data = data
23
        self.message = message
24
        self.request = request
25
26
        # internals for properties
27
        self._importlog = None
28
        self._instrument = None
29
        self._sample = None
30
31
    def import_data(self):
32
        """Import the instrument JSON data
33
        """
34
        raise NotImplementedError("ASTM import logic must be implemented "
35
                                  "by a specific instrument importer")
36
37
    @property
38
    def instrument(self):
39
        """Query the instrument of this import
40
        """
41
        if self._instrument:
42
            return self._instrument
43
44
        instrument = None
45
        name = self.get_instrument_name()
46
        serial = self.get_instrument_serial()
47
        query = {"portal_type": "Instrument"}
48
        results = api.search(query, SETUP_CATALOG)
49
50
        for brain in results:
51
            obj = api.get_object(brain)
52
            model = obj.getModel()
53
            title = obj.Title()
54
            serialno = obj.getSerialNo()
55
            # lookup instrument by model and serial
56
            if model == name and serialno == serial:
57
                instrument = obj
58
            # lookup instrument by title and serial
59
            elif title == name and serialno == serial:
60
                instrument = obj
61
            # lookup instrument by model only
62
            elif model == name:
63
                instrument = obj
64
            # lookup instrument by title only
65
            elif title == name:
66
                instrument = obj
67
68
        self._instrument = instrument
69
        return instrument
70
71
    @property
72
    def sample(self):
73
        """Query the sample of this import
74
        """
75
        if self._sample:
76
            return self._sample
77
78
        sample = None
79
        # get the sample ID from the data
80
        sid = self.get_sample_id()
81
        # Query by Sample ID
82
        query_sid = {"getId": sid}
83
        # Query by Client Sample ID
84
        query_csid = {"getClientSampleID": sid}
85
86
        results = api.search(query_sid, SAMPLE_CATALOG)
87
        if not results:
88
            results = api.search(query_csid, SAMPLE_CATALOG)
89
        if len(results) == 1:
90
            sample = api.get_object(results[0])
91
92
        self._sample = sample
93
        return sample
94
95
    def log(self, message, level="info"):
96
        """Append log to logs
97
        """
98
        timestamp = datetime.now()
99
        message = "{} {}: {}".format(timestamp, level.upper(), message)
100
        if self.instrument and self._importlog is None:
101
            self._importlog = api.create(self.instrument, "AutoImportLog")
102
            self._importlog.setInstrument(self.instrument)
103
            self._importlog.setInterface(", ".join(self.get_sender()))
104
            self._importlog.setImportFile("ASTM")
105
            self._importlog.setLogTime(timestamp.isoformat())
106
        if self._importlog:
107
            results = self._importlog.getResults()
108
            if results:
109
                results += "\n"
110
            messages = "{}{}".format(results, message)
111
            self._importlog.setResults(messages)
112
        return message
113
114
    def get_sample_id(self, default=None):
115
        """Get the Sample ID
116
        """
117
        order = self.get_order()
118
        if not order:
119
            return default
120
        sid = order.get("sample_id")
121
        if not sid:
122
            return default
123
        return sid
124
125
    def get_instrument_name(self):
126
        """Get the instrument name
127
        """
128
        sender = self.get_sender()
129
        return sender[0]
130
131
    def get_instrument_serial(self):
132
        """Get the instrument serial number
133
        """
134
        sender = self.get_sender()
135
        return sender[1]
136
137
    def get_instrument_version(self):
138
        """Get the instrument serial number
139
        """
140
        sender = self.get_sender()
141
        return sender[2]
142
143
    def get_sender(self):
144
        """Return the instrument name, serial and version
145
146
        :returns: Tuple of instrument name, serial, version
147
        """
148
        header = self.get_header()
149
        if not header:
150
            return None
151
        sender = header.get("sender", {})
152
        name = sender.get("name", "")
153
        serial = sender.get("serial", "")
154
        version = sender.get("version", "")
155
        return name, serial, version
156
157
    def create_attachment(self, container, contents, filename=None):
158
        """Create a new attachment with the given contents
159
        """
160
        attachment = api.create(container, "Attachment", title=filename)
161
        attachment_file = StringIO(contents)
162
        attachment_file.filename = filename
163
        attachment.setAttachmentFile(attachment_file)
164
        # do not render in report
165
        attachment.setRenderInReport(False)
166
        return attachment
167
168
    def get_metadata(self):
169
        """Metadata provided by the instrument parser module
170
        """
171
        return self.data.get("metadata", {})
172
173
    def get_header(self):
174
        headers = self.get_headers()
175
        if len(headers) != 1:
176
            return {}
177
        return headers[0]
178
179
    def get_order(self):
180
        orders = self.get_orders()
181
        if len(orders) != 1:
182
            return {}
183
        return orders[0]
184
185
    def get_headers(self):
186
        return self.data.get("H", [])
187
188
    def get_orders(self):
189
        return self.data.get("O", [])
190
191
    def get_comments(self):
192
        return self.data.get("C", [])
193
194
    def get_patients(self):
195
        return self.data.get("P", [])
196
197
    def get_results(self):
198
        return self.data.get("R", [])
199
200
    def get_manufacturer_infos(self):
201
        return self.data.get("M", [])
202