Test Failed
Pull Request — master (#112)
by
unknown
02:26
created

MonitoredItemService._trigger_event()   B

Complexity

Conditions 8

Size

Total Lines 24
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 22
nop 3
dl 0
loc 24
rs 7.3333
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from typing import Dict
8
from .address_space import AddressSpace
9
10
11
class MonitoredItemData:
12
    def __init__(self):
13
        self.client_handle = None
14
        self.callback_handle = None
15
        self.monitored_item_id = None
16
        self.mode = None
17
        self.filter = None
18
        self.mvalue = MonitoredItemValues()
19
        self.where_clause_evaluator = None
20
        self.queue_size = 0
21
22
23
class MonitoredItemValues:
24
    def __init__(self):
25
        self.current_value = None
26
        self.old_value = None
27
28
    def set_current_value(self, cur_val):
29
        self.old_value = self.current_value
30
        self.current_value = cur_val
31
32
    def get_current_value(self):
33
        return self.current_value
34
35
    def get_old_value(self):
36
        return self.old_value
37
38
39
class MonitoredItemService:
40
    """
41
    Implements monitored item service for one subscription
42
    """
43
44
    def __init__(self, isub, aspace: AddressSpace):
45
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
46
        self.isub = isub
47
        self.aspace: AddressSpace = aspace
48
        self._monitored_items: Dict[int, MonitoredItemData] = {}
49
        self._monitored_events = {}
50
        self._monitored_datachange: Dict[int, int] = {}
51
        self._monitored_item_counter = 111
52
        self._events_with_retain = {}
53
        self._mid_with_retain = {}
54
55
    def __str__(self):
56
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
57
58
    def delete_all_monitored_items(self):
59
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
60
61
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
62
        results = []
63
        for item in params.ItemsToCreate:
64
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
65
                result = self._create_events_monitored_item(item)
66
            else:
67
                result = await self._create_data_change_monitored_item(item)
68
            results.append(result)
69
        return results
70
71
    def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
72
        results = []
73
        for item in params.ItemsToModify:
74
            results.append(self._modify_monitored_item(item))
75
        return results
76
77
    async def trigger_datachange(self, handle, nodeid, attr):
78
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
79
        dv = self.aspace.read_attribute_value(nodeid, attr)
80
        await self.datachange_callback(handle, dv)
81
82
    def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
83
        for mdata in self._monitored_items.values():
84
            result = ua.MonitoredItemModifyResult()
85
            if mdata.monitored_item_id == params.MonitoredItemId:
86
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
87
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
88
                if params.RequestedParameters.Filter is not None:
89
                    mdata.filter = params.RequestedParameters.Filter
90
                mdata.queue_size = params.RequestedParameters.QueueSize
91
                return result
92
        result = ua.MonitoredItemModifyResult()
93
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
94
        return result
95
96
    def _commit_monitored_item(self, result, mdata: MonitoredItemData):
97
        if result.StatusCode.is_good():
98
            self._monitored_items[result.MonitoredItemId] = mdata
99
100
    def _make_monitored_item_common(self, params):
101
        result = ua.MonitoredItemCreateResult()
102
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
103
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
104
        self._monitored_item_counter += 1
105
        result.MonitoredItemId = self._monitored_item_counter
106
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
107
        mdata = MonitoredItemData()
108
        mdata.mode = params.MonitoringMode
109
        mdata.client_handle = params.RequestedParameters.ClientHandle
110
        mdata.monitored_item_id = result.MonitoredItemId
111
        mdata.queue_size = params.RequestedParameters.QueueSize
112
        mdata.filter = params.RequestedParameters.Filter
113
        return result, mdata
114
115
    def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
116
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
117
                         params.ItemToMonitor.AttributeId)
118
119
        result, mdata = self._make_monitored_item_common(params)
120
        ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
121
                                                          ua.AttributeIds.EventNotifier).Value.Value
122
123
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
124
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
125
            return result
126
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
127
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
128
        self._commit_monitored_item(result, mdata)
129
130
        def find_events(node_id):
131
            for ref in self.aspace.get(node_id).references:
132
                if ref.ReferenceTypeId == ua.NumericNodeId(41):
133
                    if ref.NodeId not in self._monitored_events:
134
                        self._monitored_events[ref.NodeId] = []
135
                    self._monitored_events[ref.NodeId].append(result.MonitoredItemId)
136
                elif ref.ReferenceTypeId == ua.NumericNodeId(48):
137
                    find_events(ref.NodeId)
138
139
        find_events(params.ItemToMonitor.NodeId)
140
        return result
141
142
    async def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
143
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
144
                         params.ItemToMonitor.AttributeId)
145
146
        result, mdata = self._make_monitored_item_common(params)
147
        result.FilterResult = params.RequestedParameters.Filter
148
        result.StatusCode, handle = self.aspace.add_datachange_callback(
149
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
150
151
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
152
        mdata.callback_handle = handle
153
        self._commit_monitored_item(result, mdata)
154
        if result.StatusCode.is_good():
155
            self._monitored_datachange[handle] = result.MonitoredItemId
156
            # force data change event generation
157
            await self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
158
        return result
159
160
    def delete_monitored_items(self, ids):
161
        self.logger.debug("delete monitored items %s", ids)
162
        # with self._lock:
163
        results = []
164
        for mid in ids:
165
            results.append(self._delete_monitored_items(mid))
166
        return results
167
168
    def _delete_monitored_items(self, mid: int):
169
        if mid not in self._monitored_items:
170
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
171
        for k, v in self._monitored_events.items():
172
            if mid in v:
173
                v.remove(mid)
174
                if not v:
175
                    self._monitored_events.pop(k)
176
                break
177
        for k, v in self._monitored_datachange.items():
178
            if v == mid:
179
                self.aspace.delete_datachange_callback(k)
180
                self._monitored_datachange.pop(k)
181
                break
182
        self._monitored_items.pop(mid)
183
        return ua.StatusCode()
184
185
    async def datachange_callback(self, handle: int, value, error=None):
186
        if error:
187
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
188
                             handle, error)
189
            await self.trigger_statuschange(error)
190
        else:
191
            # self.logger.info(f"subscription {self}: datachange callback called "
192
            #                 f"with handle '{handle}' and value '{value.Value}'")
193
            event = ua.MonitoredItemNotification()
194
            mid = self._monitored_datachange[handle]
195
            mdata = self._monitored_items[mid]
196
            mdata.mvalue.set_current_value(value.Value.Value)
197
            if mdata.filter:
198
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
199
            else:
200
                deadband_flag_pass = True
201
            if deadband_flag_pass:
202
                event.ClientHandle = mdata.client_handle
203
                event.Value = value
204
                await self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
205
206
    def deadband_callback(self, values, flt):
207
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
208
            return True
209
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
210
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
211
            return True
212
        if flt.DeadbandType == ua.DeadbandType.Percent:
213
            self.logger.warning("DeadbandType Percent is not implemented !")
214
            return True
215
        return False
216
217
    async def trigger_event(self, event):
218
        if event.emitting_node not in self._monitored_events:
219
            self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
220
            return False
221
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
222
        mids = self._monitored_events[event.EventType]
223
        for mid in mids:
224
            await self._trigger_event(event, mid)
225
        return True
226
227
    async def _trigger_event(self, event, mid: int):
228
        if mid not in self._monitored_items:
229
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
230
                              self)
231
            return
232
        if hasattr(event, 'Retain'):
233
            if event.Retain:
234
                if mid in self._mid_with_retain:
235
                    if event not in self._mid_with_retain[mid]:
236
                        self._mid_with_retain[mid].append(event)
237
                else:
238
                    self._mid_with_retain[mid] = [event]
239
            else:
240
                self._mid_with_retain[mid].remove(event)
241
                if not self._mid_with_retain[mid]:
242
                    del self._mid_with_retain[mid]
243
        mdata = self._monitored_items[mid]
244
        if not mdata.where_clause_evaluator.eval(event):
245
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
246
            return
247
        fieldlist = ua.EventFieldList()
248
        fieldlist.ClientHandle = mdata.client_handle
249
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
250
        await self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
251
252
    async def trigger_statuschange(self, code):
253
        await self.isub.enqueue_statuschange(code)
254
255
    def condition_refresh(self):
256
        for event in self._events_with_retain.values():
257
            self.trigger_event(event)
258
259
    def condition_refresh2(self, mid):
260
        if mid in self._mid_with_retain:
261
            for event in self._mid_with_retain[mid]:
262
                self._trigger_event(event, mid)
263
264
    def condition_refresh(self):
265
        for event in self._events_with_retain.values():
266
            self.trigger_event(event)
267
268
    def condition_refresh2(self, mid):
269
        if mid in self._mid_with_retain:
270
            for event in self._mid_with_retain[mid]:
271
                self._trigger_event(event, mid)
272
273
274
class WhereClauseEvaluator:
275
    def __init__(self, logger, aspace: AddressSpace, whereclause):
276
        self.logger = logger
277
        self.elements = whereclause.Elements
278
        self._aspace = aspace
279
280
    def eval(self, event):
281
        if not self.elements:
282
            return True
283
        # spec says we should only evaluate first element, which may use other elements
284
        try:
285
            res = self._eval_el(0, event)
286
        except Exception as ex:
287
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
288
                                  ex)
289
            return False
290
        return res
291
292
    def _eval_el(self, index, event):
293
        el = self.elements[index]
294
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
295
        ops = el.FilterOperands  # just to make code more readable
296
        if el.FilterOperator == ua.FilterOperator.Equals:
297
            return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
298
        if el.FilterOperator == ua.FilterOperator.IsNull:
299
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
300
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
301
            return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
302
        if el.FilterOperator == ua.FilterOperator.LessThan:
303
            return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
304
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
305
            return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
306
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
307
            return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
308
        if el.FilterOperator == ua.FilterOperator.Like:
309
            return self._like_operator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
310
        if el.FilterOperator == ua.FilterOperator.Not:
311
            return not self._eval_op(ops[0], event)
312
        if el.FilterOperator == ua.FilterOperator.Between:
313
            return self._eval_op(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
314
        if el.FilterOperator == ua.FilterOperator.InList:
315
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
316
        if el.FilterOperator == ua.FilterOperator.And:
317
            self.elements(ops[0].Index)
318
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
319
        if el.FilterOperator == ua.FilterOperator.Or:
320
            return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
321
        if el.FilterOperator == ua.FilterOperator.Cast:
322
            self.logger.warn("Cast operand not implemented, assuming True")
323
            return True
324
        if el.FilterOperator == ua.FilterOperator.OfType:
325
            return event.EventType == self._eval_op(ops[0], event)
326
        # TODO: implement missing operators
327
        self.logger.warning("WhereClause not implemented for element: %s", el)
328
        raise NotImplementedError
329
330
    def _like_operator(self, string, pattern):
331
        raise NotImplementedError
332
333
    def _eval_op(self, op, event):
334
        # seems spec says we should return Null if issues
335
        if isinstance(op, ua.ElementOperand):
336
            return self._eval_el(op.Index, event)
337
        if isinstance(op, ua.AttributeOperand):
338
            if op.BrowsePath:
339
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
340
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
341
            # FIXME: check, this is probably broken
342
        if isinstance(op, ua.SimpleAttributeOperand):
343
            if op.BrowsePath:
344
                # we only support depth of 1
345
                return getattr(event, op.BrowsePath[0].Name)
346
            # TODO: write code for index range.... but doe it make any sense
347
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
348
        if isinstance(op, ua.LiteralOperand):
349
            return op.Value.Value
350
        self.logger.warning("Where clause element % is not of a known type", op)
351
        raise NotImplementedError
352