Passed
Push — master ( 3fb232...b005e0 )
by Olivier
02:21
created

MonitoredItemService.datachange_callback()   A

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 18
nop 4
dl 0
loc 20
rs 9.5
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from typing import Dict
8
from .address_space import AddressSpace
9
10
11
class MonitoredItemData:
12
    def __init__(self):
13
        self.client_handle = None
14
        self.callback_handle = None
15
        self.monitored_item_id = None
16
        self.mode = None
17
        self.filter = None
18
        self.mvalue = MonitoredItemValues()
19
        self.where_clause_evaluator = None
20
        self.queue_size = 0
21
22
23
class MonitoredItemValues:
24
    def __init__(self):
25
        self.current_value = None
26
        self.old_value = None
27
28
    def set_current_value(self, cur_val):
29
        self.old_value = self.current_value
30
        self.current_value = cur_val
31
32
    def get_current_value(self):
33
        return self.current_value
34
35
    def get_old_value(self):
36
        return self.old_value
37
38
39
class MonitoredItemService:
40
    """
41
    Implements monitored item service for one subscription
42
    """
43
44
    def __init__(self, isub, aspace: AddressSpace):
45
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
46
        self.isub = isub
47
        self.aspace: AddressSpace = aspace
48
        self._monitored_items: Dict[int, MonitoredItemData] = {}
49
        self._monitored_events = {}
50
        self._monitored_datachange: Dict[int, int] = {}
51
        self._monitored_item_counter = 111
52
53
    def __str__(self):
54
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
55
56
    def delete_all_monitored_items(self):
57
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
58
59
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
60
        results = []
61
        for item in params.ItemsToCreate:
62
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
63
                result = self._create_events_monitored_item(item)
64
            else:
65
                result = self._create_data_change_monitored_item(item)
66
            results.append(result)
67
        return results
68
69
    def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
70
        results = []
71
        for item in params.ItemsToModify:
72
            results.append(self._modify_monitored_item(item))
73
        return results
74
75
    def trigger_datachange(self, handle, nodeid, attr):
76
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
77
        variant = self.aspace.get_attribute_value(nodeid, attr)
78
        self.datachange_callback(handle, variant)
79
80
    def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
81
        for mdata in self._monitored_items.values():
82
            result = ua.MonitoredItemModifyResult()
83
            if mdata.monitored_item_id == params.MonitoredItemId:
84
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
85
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
86
                if params.RequestedParameters.Filter is not None:
87
                    mdata.filter = params.RequestedParameters.Filter
88
                mdata.queue_size = params.RequestedParameters.QueueSize
89
                return result
90
        result = ua.MonitoredItemModifyResult()
91
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
92
        return result
93
94
    def _commit_monitored_item(self, result, mdata: MonitoredItemData):
95
        if result.StatusCode.is_good():
96
            self._monitored_items[result.MonitoredItemId] = mdata
97
            self._monitored_item_counter += 1
98
99
    def _make_monitored_item_common(self, params):
100
        result = ua.MonitoredItemCreateResult()
101
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
102
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
103
        self._monitored_item_counter += 1
104
        result.MonitoredItemId = self._monitored_item_counter
105
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
106
        mdata = MonitoredItemData()
107
        mdata.mode = params.MonitoringMode
108
        mdata.client_handle = params.RequestedParameters.ClientHandle
109
        mdata.monitored_item_id = result.MonitoredItemId
110
        mdata.queue_size = params.RequestedParameters.QueueSize
111
        mdata.filter = params.RequestedParameters.Filter
112
        return result, mdata
113
114
    def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
115
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
116
                         params.ItemToMonitor.AttributeId)
117
118
        result, mdata = self._make_monitored_item_common(params)
119
        ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId,
120
                                                         ua.AttributeIds.EventNotifier).Value.Value
121
122
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
123
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
124
            return result
125
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
126
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
127
        self._commit_monitored_item(result, mdata)
128
        if params.ItemToMonitor.NodeId not in self._monitored_events:
129
            self._monitored_events[params.ItemToMonitor.NodeId] = []
130
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
131
        return result
132
133
    def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
134
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
135
                         params.ItemToMonitor.AttributeId)
136
137
        result, mdata = self._make_monitored_item_common(params)
138
        result.FilterResult = params.RequestedParameters.Filter
139
        result.StatusCode, handle = self.aspace.add_datachange_callback(
140
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
141
142
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
143
        mdata.callback_handle = handle
144
        self._commit_monitored_item(result, mdata)
145
        if result.StatusCode.is_good():
146
            self._monitored_datachange[handle] = result.MonitoredItemId
147
            # force data change event generation
148
            self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
149
        return result
150
151
    def delete_monitored_items(self, ids):
152
        self.logger.debug("delete monitored items %s", ids)
153
        # with self._lock:
154
        results = []
155
        for mid in ids:
156
            results.append(self._delete_monitored_items(mid))
157
        return results
158
159
    def _delete_monitored_items(self, mid: int):
160
        if mid not in self._monitored_items:
161
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
162
        for k, v in self._monitored_events.items():
163
            if mid in v:
164
                v.remove(mid)
165
                if not v:
166
                    self._monitored_events.pop(k)
167
                break
168
        for k, v in self._monitored_datachange.items():
169
            if v == mid:
170
                self.aspace.delete_datachange_callback(k)
171
                self._monitored_datachange.pop(k)
172
                break
173
        self._monitored_items.pop(mid)
174
        return ua.StatusCode()
175
176
    def datachange_callback(self, handle: int, value, error=None):
177
        if error:
178
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
179
                             handle, error)
180
            self.trigger_statuschange(error)
181
        else:
182
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self,
183
                             handle, value.Value)
184
            event = ua.MonitoredItemNotification()
185
            mid = self._monitored_datachange[handle]
186
            mdata = self._monitored_items[mid]
187
            mdata.mvalue.set_current_value(value.Value.Value)
188
            if mdata.filter:
189
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
190
            else:
191
                deadband_flag_pass = True
192
            if deadband_flag_pass:
193
                event.ClientHandle = mdata.client_handle
194
                event.Value = value
195
                self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
196
197
    def deadband_callback(self, values, flt):
198
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
199
            return True
200
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
201
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
202
            return True
203
        if flt.DeadbandType == ua.DeadbandType.Percent:
204
            self.logger.warning("DeadbandType Percent is not implemented !")
205
            return True
206
        return False
207
208
    def trigger_event(self, event):
209
        if event.emitting_node not in self._monitored_events:
210
            self.logger.debug("%s has no subscription for events %s from node: %s", self, event, event.emitting_node)
211
            return False
212
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
213
        mids = self._monitored_events[event.emitting_node]
214
        for mid in mids:
215
            self._trigger_event(event, mid)
216
        return True
217
218
    def _trigger_event(self, event, mid: int):
219
        if mid not in self._monitored_items:
220
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
221
                              self)
222
            return
223
        mdata = self._monitored_items[mid]
224
        if not mdata.where_clause_evaluator.eval(event):
225
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
226
            return
227
        fieldlist = ua.EventFieldList()
228
        fieldlist.ClientHandle = mdata.client_handle
229
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
230
        self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
231
232
    def trigger_statuschange(self, code):
233
        self.isub.enqueue_statuschange(code)
234
235
236
class WhereClauseEvaluator:
237
    def __init__(self, logger, aspace: AddressSpace, whereclause):
238
        self.logger = logger
239
        self.elements = whereclause.Elements
240
        self._aspace = aspace
241
242
    def eval(self, event):
243
        if not self.elements:
244
            return True
245
        # spec says we should only evaluate first element, which may use other elements
246
        try:
247
            res = self._eval_el(0, event)
248
        except Exception as ex:
249
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
250
                                  ex)
251
            return False
252
        return res
253
254
    def _eval_el(self, index, event):
255
        el = self.elements[index]
256
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
257
        ops = el.FilterOperands  # just to make code more readable
258
        if el.FilterOperator == ua.FilterOperator.Equals:
259
            return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
260
        if el.FilterOperator == ua.FilterOperator.IsNull:
261
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
262
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
263
            return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
264
        if el.FilterOperator == ua.FilterOperator.LessThan:
265
            return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
266
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
267
            return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
268
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
269
            return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
270
        if el.FilterOperator == ua.FilterOperator.Like:
271
            return self._like_operator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
272
        if el.FilterOperator == ua.FilterOperator.Not:
273
            return not self._eval_op(ops[0], event)
274
        if el.FilterOperator == ua.FilterOperator.Between:
275
            return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
276
        if el.FilterOperator == ua.FilterOperator.InList:
277
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
278
        if el.FilterOperator == ua.FilterOperator.And:
279
            self.elements(ops[0].Index)
280
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
281
        if el.FilterOperator == ua.FilterOperator.Or:
282
            return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
283
        if el.FilterOperator == ua.FilterOperator.Cast:
284
            self.logger.warn("Cast operand not implemented, assuming True")
285
            return True
286
        if el.FilterOperator == ua.FilterOperator.OfType:
287
            return event.EventType == self._eval_op(ops[0], event)
288
        # TODO: implement missing operators
289
        self.logger.warning("WhereClause not implemented for element: %s", el)
290
        raise NotImplementedError
291
292
    def _like_operator(self, string, pattern):
293
        raise NotImplementedError
294
295
    def _eval_op(self, op, event):
296
        # seems spec says we should return Null if issues
297
        if isinstance(op, ua.ElementOperand):
298
            return self._eval_el(op.Index, event)
299
        if isinstance(op, ua.AttributeOperand):
300
            if op.BrowsePath:
301
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
302
            return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
303
            # FIXME: check, this is probably broken
304
        if isinstance(op, ua.SimpleAttributeOperand):
305
            if op.BrowsePath:
306
                # we only support depth of 1
307
                return getattr(event, op.BrowsePath[0].Name)
308
            # TODO: write code for index range.... but doe it make any sense
309
            return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
310
        if isinstance(op, ua.LiteralOperand):
311
            return op.Value.Value
312
        self.logger.warning("Where clause element % is not of a known type", op)
313
        raise NotImplementedError
314