Passed
Pull Request — master (#365)
by
unknown
02:53
created

MonitoredItemService._trigger_event()   B

Complexity

Conditions 8

Size

Total Lines 24
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 22
nop 3
dl 0
loc 24
rs 7.3333
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from typing import Dict
8
from .address_space import AddressSpace
9
10
11
class MonitoredItemData:
12
    def __init__(self):
13
        self.client_handle = None
14
        self.callback_handle = None
15
        self.monitored_item_id = None
16
        self.mode = None
17
        self.filter = None
18
        self.mvalue = MonitoredItemValues()
19
        self.where_clause_evaluator = None
20
        self.queue_size = 0
21
22
23
class MonitoredItemValues:
24
    def __init__(self):
25
        self.current_value = None
26
        self.old_value = None
27
28
    def set_current_value(self, cur_val):
29
        self.old_value = self.current_value
30
        self.current_value = cur_val
31
32
    def get_current_value(self):
33
        return self.current_value
34
35
    def get_old_value(self):
36
        return self.old_value
37
38
39
class MonitoredItemService:
40
    """
41
    Implements monitored item service for one subscription
42
    """
43
44
    def __init__(self, isub, aspace: AddressSpace):
45
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
46
        self.isub = isub
47
        self.aspace: AddressSpace = aspace
48
        self._monitored_items: Dict[int, MonitoredItemData] = {}
49
        self._monitored_events = {}
50
        self._monitored_datachange: Dict[int, int] = {}
51
        self._monitored_item_counter = 111
52
        self._events_with_retain = {}
53
        self._mid_with_retain = {}
54
55
    def __str__(self):
56
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
57
58
    def delete_all_monitored_items(self):
59
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
60
61
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
62
        results = []
63
        for item in params.ItemsToCreate:
64
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
65
                result = self._create_events_monitored_item(item)
66
            else:
67
                result = await self._create_data_change_monitored_item(item)
68
            results.append(result)
69
        return results
70
71
    def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
72
        results = []
73
        for item in params.ItemsToModify:
74
            results.append(self._modify_monitored_item(item))
75
        return results
76
77
    async def trigger_datachange(self, handle, nodeid, attr):
78
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
79
        dv = self.aspace.read_attribute_value(nodeid, attr)
80
        await self.datachange_callback(handle, dv)
81
82
    def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
83
        for mdata in self._monitored_items.values():
84
            result = ua.MonitoredItemModifyResult()
85
            if mdata.monitored_item_id == params.MonitoredItemId:
86
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
87
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
88
                if params.RequestedParameters.Filter is not None:
89
                    mdata.filter = params.RequestedParameters.Filter
90
                mdata.queue_size = params.RequestedParameters.QueueSize
91
                return result
92
        result = ua.MonitoredItemModifyResult()
93
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
94
        return result
95
96
    def _commit_monitored_item(self, result, mdata: MonitoredItemData):
97
        if result.StatusCode.is_good():
98
            self._monitored_items[result.MonitoredItemId] = mdata
99
100
    def _make_monitored_item_common(self, params):
101
        result = ua.MonitoredItemCreateResult()
102
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
103
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
104
        self._monitored_item_counter += 1
105
        result.MonitoredItemId = self._monitored_item_counter
106
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
107
        mdata = MonitoredItemData()
108
        mdata.mode = params.MonitoringMode
109
        mdata.client_handle = params.RequestedParameters.ClientHandle
110
        mdata.monitored_item_id = result.MonitoredItemId
111
        mdata.queue_size = params.RequestedParameters.QueueSize
112
        mdata.filter = params.RequestedParameters.Filter
113
        return result, mdata
114
115
    def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
116
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
117
                         params.ItemToMonitor.AttributeId)
118
119
        result, mdata = self._make_monitored_item_common(params)
120
        ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
121
                                                          ua.AttributeIds.EventNotifier).Value.Value
122
123
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
124
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
125
            return result
126
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
127
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
128
        self._commit_monitored_item(result, mdata)
129
130
        def find_events(node_id):
131
            for ref in self.aspace.get(node_id).references:
132
                if ref.ReferenceTypeId == ua.NumericNodeId(41):
133
                    if ref.NodeId not in self._monitored_events:
134
                        self._monitored_events[ref.NodeId] = []
135
                    self._monitored_events[ref.NodeId].append(result.MonitoredItemId)
136
                elif ref.ReferenceTypeId == ua.NumericNodeId(48):
137
                    find_events(ref.NodeId)
138
139
        find_events(params.ItemToMonitor.NodeId)
140
141
        if params.ItemToMonitor.NodeId not in self._monitored_events:
142
            self._monitored_events[params.ItemToMonitor.NodeId] = []
143
        self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
144
145
        return result
146
147
    async def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
148
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
149
                         params.ItemToMonitor.AttributeId)
150
151
        result, mdata = self._make_monitored_item_common(params)
152
        result.FilterResult = params.RequestedParameters.Filter
153
        result.StatusCode, handle = self.aspace.add_datachange_callback(
154
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
155
156
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
157
        mdata.callback_handle = handle
158
        self._commit_monitored_item(result, mdata)
159
        if result.StatusCode.is_good():
160
            self._monitored_datachange[handle] = result.MonitoredItemId
161
            # force data change event generation
162
            await self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
163
        return result
164
165
    def delete_monitored_items(self, ids):
166
        self.logger.debug("delete monitored items %s", ids)
167
        # with self._lock:
168
        results = []
169
        for mid in ids:
170
            results.append(self._delete_monitored_items(mid))
171
        return results
172
173
    def _delete_monitored_items(self, mid: int):
174
        if mid not in self._monitored_items:
175
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
176
        for k, v in self._monitored_events.items():
177
            if mid in v:
178
                v.remove(mid)
179
                if not v:
180
                    self._monitored_events.pop(k)
181
                break
182
        for k, v in self._monitored_datachange.items():
183
            if v == mid:
184
                self.aspace.delete_datachange_callback(k)
185
                self._monitored_datachange.pop(k)
186
                break
187
        self._monitored_items.pop(mid)
188
        return ua.StatusCode()
189
190
    async def datachange_callback(self, handle: int, value, error=None):
191
        if error:
192
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
193
                             handle, error)
194
            await self.trigger_statuschange(error)
195
        else:
196
            # self.logger.info(f"subscription {self}: datachange callback called "
197
            #                 f"with handle '{handle}' and value '{value.Value}'")
198
            event = ua.MonitoredItemNotification()
199
            mid = self._monitored_datachange[handle]
200
            mdata = self._monitored_items[mid]
201
            mdata.mvalue.set_current_value(value.Value.Value)
202
            if mdata.filter:
203
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
204
            else:
205
                deadband_flag_pass = True
206
            if deadband_flag_pass:
207
                event.ClientHandle = mdata.client_handle
208
                event.Value = value
209
                await self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
210
211
    def deadband_callback(self, values, flt):
212
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
213
            return True
214
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
215
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
216
            return True
217
        if flt.DeadbandType == ua.DeadbandType.Percent:
218
            self.logger.warning("DeadbandType Percent is not implemented !")
219
            return True
220
        return False
221
222
    async def trigger_event(self, event):
223
        if hasattr(event, 'Retain'):
224
            if event.Retain:
225
                self._events_with_retain[event.emitting_node] = event
226
            else:
227
                del self._events_with_retain[event.emitting_node]
228
        if event.emitting_node not in self._monitored_events:
229
            self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
230
            return False
231
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
232
        mids = self._monitored_events[event.emitting_node]
233
        for mid in mids:
234
            await self._trigger_event(event, mid)
235
        return True
236
237
    async def _trigger_event(self, event, mid: int):
238
        if mid not in self._monitored_items:
239
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
240
                              self)
241
            return
242
        if hasattr(event, 'Retain'):
243
            if event.Retain:
244
                if mid in self._mid_with_retain:
245
                    if event not in self._mid_with_retain[mid]:
246
                        self._mid_with_retain[mid].append(event)
247
                else:
248
                    self._mid_with_retain[mid] = [event]
249
            else:
250
                self._mid_with_retain[mid].remove(event)
251
                if not self._mid_with_retain[mid]:
252
                    del self._mid_with_retain[mid]
253
        mdata = self._monitored_items[mid]
254
        if not mdata.where_clause_evaluator.eval(event):
255
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
256
            return
257
        fieldlist = ua.EventFieldList()
258
        fieldlist.ClientHandle = mdata.client_handle
259
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
260
        await self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
261
262
    async def trigger_statuschange(self, code):
263
        await self.isub.enqueue_statuschange(code)
264
265
    async def condition_refresh(self):
266
        for event in self._events_with_retain.values():
267
            await self.trigger_event(event)
268
269
    def condition_refresh2(self, mid):
270
        if mid in self._mid_with_retain:
271
            for event in self._mid_with_retain[mid]:
272
                self._trigger_event(event, mid)
273
274
275
class WhereClauseEvaluator:
276
    def __init__(self, logger, aspace: AddressSpace, whereclause):
277
        self.logger = logger
278
        self.elements = whereclause.Elements
279
        self._aspace = aspace
280
281
    def eval(self, event):
282
        if not self.elements:
283
            return True
284
        # spec says we should only evaluate first element, which may use other elements
285
        try:
286
            res = self._eval_el(0, event)
287
        except Exception as ex:
288
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
289
                                  ex)
290
            return False
291
        return res
292
293
    def _eval_el(self, index, event):
294
        el = self.elements[index]
295
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
296
        ops = el.FilterOperands  # just to make code more readable
297
        if el.FilterOperator == ua.FilterOperator.Equals:
298
            return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
299
        if el.FilterOperator == ua.FilterOperator.IsNull:
300
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
301
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
302
            return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
303
        if el.FilterOperator == ua.FilterOperator.LessThan:
304
            return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
305
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
306
            return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
307
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
308
            return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
309
        if el.FilterOperator == ua.FilterOperator.Like:
310
            return self._like_operator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
311
        if el.FilterOperator == ua.FilterOperator.Not:
312
            return not self._eval_op(ops[0], event)
313
        if el.FilterOperator == ua.FilterOperator.Between:
314
            return self._eval_op(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
315
        if el.FilterOperator == ua.FilterOperator.InList:
316
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
317
        if el.FilterOperator == ua.FilterOperator.And:
318
            self.elements(ops[0].Index)
319
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
320
        if el.FilterOperator == ua.FilterOperator.Or:
321
            return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
322
        if el.FilterOperator == ua.FilterOperator.Cast:
323
            self.logger.warn("Cast operand not implemented, assuming True")
324
            return True
325
        if el.FilterOperator == ua.FilterOperator.OfType:
326
            return event.EventType == self._eval_op(ops[0], event)
327
        # TODO: implement missing operators
328
        self.logger.warning("WhereClause not implemented for element: %s", el)
329
        raise NotImplementedError
330
331
    def _like_operator(self, string, pattern):
332
        raise NotImplementedError
333
334
    def _eval_op(self, op, event):
335
        # seems spec says we should return Null if issues
336
        if isinstance(op, ua.ElementOperand):
337
            return self._eval_el(op.Index, event)
338
        if isinstance(op, ua.AttributeOperand):
339
            if op.BrowsePath:
340
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
341
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
342
            # FIXME: check, this is probably broken
343
        if isinstance(op, ua.SimpleAttributeOperand):
344
            if op.BrowsePath:
345
                # we only support depth of 1
346
                return getattr(event, op.BrowsePath[0].Name)
347
            # TODO: write code for index range.... but doe it make any sense
348
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
349
        if isinstance(op, ua.LiteralOperand):
350
            return op.Value.Value
351
        self.logger.warning("Where clause element % is not of a known type", op)
352
        raise NotImplementedError
353