Passed
Pull Request — master (#45)
by
unknown
02:29
created

MonitoredItemService.datachange_callback()   A

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 18
nop 4
dl 0
loc 20
rs 9.5
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
8
9
class MonitoredItemData:
10
    def __init__(self):
11
        self.client_handle = None
12
        self.callback_handle = None
13
        self.monitored_item_id = None
14
        self.mode = None
15
        self.filter = None
16
        self.mvalue = MonitoredItemValues()
17
        self.where_clause_evaluator = None
18
        self.queue_size = 0
19
20
21
class MonitoredItemValues:
22
    def __init__(self):
23
        self.current_value = None
24
        self.old_value = None
25
26
    def set_current_value(self, cur_val):
27
        self.old_value = self.current_value
28
        self.current_value = cur_val
29
30
    def get_current_value(self):
31
        return self.current_value
32
33
    def get_old_value(self):
34
        return self.old_value
35
36
37
class MonitoredItemService:
38
    """
39
    Implements monitored item service for one subscription
40
    """
41
42
    def __init__(self, isub, aspace):
43
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
44
        self.isub = isub
45
        self.aspace = aspace
46
        self._monitored_items = {}
47
        self._monitored_events = {}
48
        self._monitored_datachange = {}
49
        self._monitored_item_counter = 111
50
51
    def __str__(self):
52
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
53
54
    def delete_all_monitored_items(self):
55
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
56
57
    async def create_monitored_items(self, params):
58
        results = []
59
        for item in params.ItemsToCreate:
60
            # with self._lock:
61
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
62
                result = self._create_events_monitored_item(item)
63
            else:
64
                result = self._create_data_change_monitored_item(item)
65
            results.append(result)
66
        return results
67
68
    def modify_monitored_items(self, params):
69
        results = []
70
        for item in params.ItemsToModify:
71
            results.append(self._modify_monitored_item(item))
72
        return results
73
74
    def trigger_datachange(self, handle, nodeid, attr):
75
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
76
        variant = self.aspace.get_attribute_value(nodeid, attr)
77
        self.datachange_callback(handle, variant)
78
79
    def _modify_monitored_item(self, params):
80
        for mdata in self._monitored_items.values():
81
            result = ua.MonitoredItemModifyResult()
82
            if mdata.monitored_item_id == params.MonitoredItemId:
83
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
84
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
85
                if params.RequestedParameters.Filter is not None:
86
                    mdata.filter = params.RequestedParameters.Filter
87
                mdata.queue_size = params.RequestedParameters.QueueSize
88
                return result
89
        result = ua.MonitoredItemModifyResult()
90
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
91
        return result
92
93
    def _commit_monitored_item(self, result, mdata):
94
        if result.StatusCode.is_good():
95
            self._monitored_items[result.MonitoredItemId] = mdata
96
            self._monitored_item_counter += 1
97
98
    def _make_monitored_item_common(self, params):
99
        result = ua.MonitoredItemCreateResult()
100
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
101
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
102
        self._monitored_item_counter += 1
103
        result.MonitoredItemId = self._monitored_item_counter
104
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
105
106
        mdata = MonitoredItemData()
107
        mdata.mode = params.MonitoringMode
108
        mdata.client_handle = params.RequestedParameters.ClientHandle
109
        mdata.monitored_item_id = result.MonitoredItemId
110
        mdata.queue_size = params.RequestedParameters.QueueSize
111
        mdata.filter = params.RequestedParameters.Filter
112
113
        return result, mdata
114
115
    def _create_events_monitored_item(self, params):
116
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
117
                         params.ItemToMonitor.AttributeId)
118
119
        result, mdata = self._make_monitored_item_common(params)
120
        ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId,
121
                                                         ua.AttributeIds.EventNotifier).Value.Value
122
123
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
124
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
125
            return result
126
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
127
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
128
        self._commit_monitored_item(result, mdata)
129
        if params.ItemToMonitor.NodeId not in self._monitored_events:
130
            self._monitored_events[params.ItemToMonitor.NodeId] = []
131
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
132
        return result
133
134
    def _create_data_change_monitored_item(self, params):
135
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
136
                         params.ItemToMonitor.AttributeId)
137
138
        result, mdata = self._make_monitored_item_common(params)
139
        result.FilterResult = params.RequestedParameters.Filter
140
        result.StatusCode, handle = self.aspace.add_datachange_callback(
141
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
142
143
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
144
        mdata.callback_handle = handle
145
        self._commit_monitored_item(result, mdata)
146
        if result.StatusCode.is_good():
147
            self._monitored_datachange[handle] = result.MonitoredItemId
148
            # force data change event generation
149
            self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
150
        return result
151
152
    def delete_monitored_items(self, ids):
153
        self.logger.debug("delete monitored items %s", ids)
154
        # with self._lock:
155
        results = []
156
        for mid in ids:
157
            results.append(self._delete_monitored_items(mid))
158
        return results
159
160
    def _delete_monitored_items(self, mid):
161
        if mid not in self._monitored_items:
162
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
163
        for k, v in self._monitored_events.items():
164
            if mid in v:
165
                v.remove(mid)
166
                if not v:
167
                    self._monitored_events.pop(k)
168
                break
169
        for k, v in self._monitored_datachange.items():
170
            if v == mid:
171
                self.aspace.delete_datachange_callback(k)
172
                self._monitored_datachange.pop(k)
173
                break
174
        self._monitored_items.pop(mid)
175
        return ua.StatusCode()
176
177
    def datachange_callback(self, handle, value, error=None):
178
        if error:
179
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
180
                             handle, error)
181
            self.trigger_statuschange(error)
182
        else:
183
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
184
                             handle, value.Value)
185
            event = ua.MonitoredItemNotification()
186
            mid = self._monitored_datachange[handle]
187
            mdata = self._monitored_items[mid]
188
            mdata.mvalue.set_current_value(value.Value.Value)
189
            if mdata.filter:
190
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
191
            else:
192
                deadband_flag_pass = True
193
            if deadband_flag_pass:
194
                event.ClientHandle = mdata.client_handle
195
                event.Value = value
196
                self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
197
198
    def deadband_callback(self, values, flt):
199
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
200
            return True
201
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
202
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
203
            return True
204
        if flt.DeadbandType == ua.DeadbandType.Percent:
205
            self.logger.warning("DeadbandType Percent is not implemented !")
206
            return True
207
        return False
208
209
    def trigger_event(self, event):
210
        if event.emitting_node not in self._monitored_events:
211
            self.logger.debug("%s has no subscription for events %s from node: %s", self, event, event.emitting_node)
212
            return False
213
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
214
        mids = self._monitored_events[event.emitting_node]
215
        for mid in mids:
216
            self._trigger_event(event, mid)
217
        return True
218
219
    def _trigger_event(self, event, mid):
220
        if mid not in self._monitored_items:
221
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
222
                              self)
223
            return
224
        mdata = self._monitored_items[mid]
225
        if not mdata.where_clause_evaluator.eval(event):
226
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
227
            return
228
        fieldlist = ua.EventFieldList()
229
        fieldlist.ClientHandle = mdata.client_handle
230
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
231
        self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
232
233
    def trigger_statuschange(self, code):
234
        self.isub.enqueue_statuschange(code)
235
236
237
class WhereClauseEvaluator:
238
    def __init__(self, logger, aspace, whereclause):
239
        self.logger = logger
240
        self.elements = whereclause.Elements
241
        self._aspace = aspace
242
243
    def eval(self, event):
244
        if not self.elements:
245
            return True
246
        # spec says we should only evaluate first element, which may use other elements
247
        try:
248
            res = self._eval_el(0, event)
249
        except Exception as ex:
250
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
251
                                  ex)
252
            return False
253
        return res
254
255
    def _eval_el(self, index, event):
256
        el = self.elements[index]
257
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
258
        ops = el.FilterOperands  # just to make code more readable
259
        if el.FilterOperator == ua.FilterOperator.Equals:
260
            return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
261
        if el.FilterOperator == ua.FilterOperator.IsNull:
262
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
263
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
264
            return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
265
        if el.FilterOperator == ua.FilterOperator.LessThan:
266
            return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
267
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
268
            return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
269
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
270
            return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
271
        if el.FilterOperator == ua.FilterOperator.Like:
272
            return self._like_operator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
273
        if el.FilterOperator == ua.FilterOperator.Not:
274
            return not self._eval_op(ops[0], event)
275
        if el.FilterOperator == ua.FilterOperator.Between:
276
            return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
277
        if el.FilterOperator == ua.FilterOperator.InList:
278
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
279
        if el.FilterOperator == ua.FilterOperator.And:
280
            self.elements(ops[0].Index)
281
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
282
        if el.FilterOperator == ua.FilterOperator.Or:
283
            return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
284
        if el.FilterOperator == ua.FilterOperator.Cast:
285
            self.logger.warn("Cast operand not implemented, assuming True")
286
            return True
287
        if el.FilterOperator == ua.FilterOperator.OfType:
288
            return event.EventType == self._eval_op(ops[0], event)
289
        # TODO: implement missing operators
290
        self.logger.warning("WhereClause not implemented for element: %s", el)
291
        raise NotImplementedError
292
293
    def _like_operator(self, string, pattern):
294
        raise NotImplementedError
295
296
    def _eval_op(self, op, event):
297
        # seems spec says we should return Null if issues
298
        if isinstance(op, ua.ElementOperand):
299
            return self._eval_el(op.Index, event)
300
        if isinstance(op, ua.AttributeOperand):
301
            if op.BrowsePath:
302
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
303
            return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
304
            # FIXME: check, this is probably broken
305
        if isinstance(op, ua.SimpleAttributeOperand):
306
            if op.BrowsePath:
307
                # we only support depth of 1
308
                return getattr(event, op.BrowsePath[0].Name)
309
            # TODO: write code for index range.... but doe it make any sense
310
            return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
311
        if isinstance(op, ua.LiteralOperand):
312
            return op.Value.Value
313
        self.logger.warning("Where clause element % is not of a known type", op)
314
        raise NotImplementedError
315