Passed
Pull Request — master (#365)
by
unknown
05:11
created

MonitoredItemData.__init__()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 1
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
"""
2
server side implementation of a subscription object
3
"""
4
5
import logging
6
from asyncua import ua
7
from typing import Dict
8
from .address_space import AddressSpace
9
10
11
class MonitoredItemData:
12
    def __init__(self):
13
        self.client_handle = None
14
        self.callback_handle = None
15
        self.monitored_item_id = None
16
        self.mode = None
17
        self.filter = None
18
        self.mvalue = MonitoredItemValues()
19
        self.where_clause_evaluator = None
20
        self.queue_size = 0
21
22
23
class MonitoredItemValues:
24
    def __init__(self):
25
        self.current_value = None
26
        self.old_value = None
27
28
    def set_current_value(self, cur_val):
29
        self.old_value = self.current_value
30
        self.current_value = cur_val
31
32
    def get_current_value(self):
33
        return self.current_value
34
35
    def get_old_value(self):
36
        return self.old_value
37
38
39
class MonitoredItemService:
40
    """
41
    Implements monitored item service for one subscription
42
    """
43
44
    def __init__(self, isub, aspace: AddressSpace):
45
        self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
46
        self.isub = isub
47
        self.aspace: AddressSpace = aspace
48
        self._monitored_items: Dict[int, MonitoredItemData] = {}
49
        self._monitored_events = {}
50
        self._monitored_datachange: Dict[int, int] = {}
51
        self._monitored_item_counter = 111
52
        self._events_with_retain = {}
53
        self._mid_with_retain = {}
54
55
    def __str__(self):
56
        return f"MonitoredItemService({self.isub.data.SubscriptionId})"
57
58
    def delete_all_monitored_items(self):
59
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
60
61
    async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
62
        results = []
63
        for item in params.ItemsToCreate:
64
            if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
65
                result = self._create_events_monitored_item(item)
66
            else:
67
                result = await self._create_data_change_monitored_item(item)
68
            results.append(result)
69
        return results
70
71
    def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
72
        results = []
73
        for item in params.ItemsToModify:
74
            results.append(self._modify_monitored_item(item))
75
        return results
76
77
    async def trigger_datachange(self, handle, nodeid, attr):
78
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
79
        dv = self.aspace.read_attribute_value(nodeid, attr)
80
        await self.datachange_callback(handle, dv)
81
82
    def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
83
        for mdata in self._monitored_items.values():
84
            result = ua.MonitoredItemModifyResult()
85
            if mdata.monitored_item_id == params.MonitoredItemId:
86
                result.RevisedSamplingInterval = params.RequestedParameters.SamplingInterval
87
                result.RevisedQueueSize = params.RequestedParameters.QueueSize
88
                if params.RequestedParameters.Filter is not None:
89
                    mdata.filter = params.RequestedParameters.Filter
90
                mdata.queue_size = params.RequestedParameters.QueueSize
91
                return result
92
        result = ua.MonitoredItemModifyResult()
93
        result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
94
        return result
95
96
    def _commit_monitored_item(self, result, mdata: MonitoredItemData):
97
        if result.StatusCode.is_good():
98
            self._monitored_items[result.MonitoredItemId] = mdata
99
100
    def _make_monitored_item_common(self, params):
101
        result = ua.MonitoredItemCreateResult()
102
        result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
103
        result.RevisedQueueSize = params.RequestedParameters.QueueSize
104
        self._monitored_item_counter += 1
105
        result.MonitoredItemId = self._monitored_item_counter
106
        self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
107
        mdata = MonitoredItemData()
108
        mdata.mode = params.MonitoringMode
109
        mdata.client_handle = params.RequestedParameters.ClientHandle
110
        mdata.monitored_item_id = result.MonitoredItemId
111
        mdata.queue_size = params.RequestedParameters.QueueSize
112
        mdata.filter = params.RequestedParameters.Filter
113
        return result, mdata
114
115
    def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
116
        self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
117
                         params.ItemToMonitor.AttributeId)
118
119
        result, mdata = self._make_monitored_item_common(params)
120
        ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
121
                                                          ua.AttributeIds.EventNotifier).Value.Value
122
123
        if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
124
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
125
            return result
126
        # result.FilterResult = ua.EventFilterResult()  # spec says we can ignore if not error
127
        mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
128
        self._commit_monitored_item(result, mdata)
129
130
        def find_events(node_id):
131
            for ref in self.aspace.get(node_id).references:
132
                if ref.ReferenceTypeId == ua.NumericNodeId(41):
133
                    if ref.NodeId not in self._monitored_events:
134
                        self._monitored_events[ref.NodeId] = []
135
                    self._monitored_events[ref.NodeId].append(result.MonitoredItemId)
136
                elif ref.ReferenceTypeId == ua.NumericNodeId(48):
137
                    find_events(ref.NodeId)
138
139
        find_events(params.ItemToMonitor.NodeId)
140
        return result
141
142
    async def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
143
        self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
144
                         params.ItemToMonitor.AttributeId)
145
146
        result, mdata = self._make_monitored_item_common(params)
147
        result.FilterResult = params.RequestedParameters.Filter
148
        result.StatusCode, handle = self.aspace.add_datachange_callback(
149
            params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
150
151
        self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
152
        mdata.callback_handle = handle
153
        self._commit_monitored_item(result, mdata)
154
        if result.StatusCode.is_good():
155
            self._monitored_datachange[handle] = result.MonitoredItemId
156
            # force data change event generation
157
            await self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
158
        return result
159
160
    def delete_monitored_items(self, ids):
161
        self.logger.debug("delete monitored items %s", ids)
162
        # with self._lock:
163
        results = []
164
        for mid in ids:
165
            results.append(self._delete_monitored_items(mid))
166
        return results
167
168
    def _delete_monitored_items(self, mid: int):
169
        if mid not in self._monitored_items:
170
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
171
        for k, v in self._monitored_events.items():
172
            if mid in v:
173
                v.remove(mid)
174
                if not v:
175
                    self._monitored_events.pop(k)
176
                break
177
        for k, v in self._monitored_datachange.items():
178
            if v == mid:
179
                self.aspace.delete_datachange_callback(k)
180
                self._monitored_datachange.pop(k)
181
                break
182
        self._monitored_items.pop(mid)
183
        return ua.StatusCode()
184
185
    async def datachange_callback(self, handle: int, value, error=None):
186
        if error:
187
            self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
188
                             handle, error)
189
            await self.trigger_statuschange(error)
190
        else:
191
            # self.logger.info(f"subscription {self}: datachange callback called "
192
            #                 f"with handle '{handle}' and value '{value.Value}'")
193
            event = ua.MonitoredItemNotification()
194
            mid = self._monitored_datachange[handle]
195
            mdata = self._monitored_items[mid]
196
            mdata.mvalue.set_current_value(value.Value.Value)
197
            if mdata.filter:
198
                deadband_flag_pass = self.deadband_callback(mdata.mvalue, mdata.filter)
199
            else:
200
                deadband_flag_pass = True
201
            if deadband_flag_pass:
202
                event.ClientHandle = mdata.client_handle
203
                event.Value = value
204
                await self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
205
206
    def deadband_callback(self, values, flt):
207
        if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
208
            return True
209
        if flt.DeadbandType == ua.DeadbandType.Absolute and \
210
                ((abs(values.get_current_value() - values.get_old_value())) > flt.DeadbandValue):
211
            return True
212
        if flt.DeadbandType == ua.DeadbandType.Percent:
213
            self.logger.warning("DeadbandType Percent is not implemented !")
214
            return True
215
        return False
216
217
    async def trigger_event(self, event):
218
        if hasattr(event, 'Retain'):
219
            if event.Retain:
220
                self._events_with_retain[event.EventType] = event
221
            else:
222
                del self._events_with_retain[event.EventType]
223
        if event.emitting_node not in self._monitored_events:
224
            self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
225
            return False
226
        self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
227
        mids = self._monitored_events[event.EventType]
228
        for mid in mids:
229
            await self._trigger_event(event, mid)
230
        return True
231
232
    async def _trigger_event(self, event, mid: int):
233
        if mid not in self._monitored_items:
234
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
235
                              self)
236
            return
237
        if hasattr(event, 'Retain'):
238
            if event.Retain:
239
                if mid in self._mid_with_retain:
240
                    if event not in self._mid_with_retain[mid]:
241
                        self._mid_with_retain[mid].append(event)
242
                else:
243
                    self._mid_with_retain[mid] = [event]
244
            else:
245
                self._mid_with_retain[mid].remove(event)
246
                if not self._mid_with_retain[mid]:
247
                    del self._mid_with_retain[mid]
248
        mdata = self._monitored_items[mid]
249
        if not mdata.where_clause_evaluator.eval(event):
250
            self.logger.info("%s, %s, Event %s does not fit WhereClause, not generating event", self, mid, event)
251
            return
252
        fieldlist = ua.EventFieldList()
253
        fieldlist.ClientHandle = mdata.client_handle
254
        fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
255
        await self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
256
257
    async def trigger_statuschange(self, code):
258
        await self.isub.enqueue_statuschange(code)
259
260
    async def condition_refresh(self):
261
        for event in self._events_with_retain.values():
262
            await self.trigger_event(event)
263
264
    def condition_refresh2(self, mid):
265
        if mid in self._mid_with_retain:
266
            for event in self._mid_with_retain[mid]:
267
                self._trigger_event(event, mid)
268
269
270
class WhereClauseEvaluator:
271
    def __init__(self, logger, aspace: AddressSpace, whereclause):
272
        self.logger = logger
273
        self.elements = whereclause.Elements
274
        self._aspace = aspace
275
276
    def eval(self, event):
277
        if not self.elements:
278
            return True
279
        # spec says we should only evaluate first element, which may use other elements
280
        try:
281
            res = self._eval_el(0, event)
282
        except Exception as ex:
283
            self.logger.exception("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event,
284
                                  ex)
285
            return False
286
        return res
287
288
    def _eval_el(self, index, event):
289
        el = self.elements[index]
290
        # ops = [self._eval_op(op, event) for op in el.FilterOperands]
291
        ops = el.FilterOperands  # just to make code more readable
292
        if el.FilterOperator == ua.FilterOperator.Equals:
293
            return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
294
        if el.FilterOperator == ua.FilterOperator.IsNull:
295
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
296
        if el.FilterOperator == ua.FilterOperator.GreaterThan:
297
            return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
298
        if el.FilterOperator == ua.FilterOperator.LessThan:
299
            return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
300
        if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
301
            return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
302
        if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
303
            return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
304
        if el.FilterOperator == ua.FilterOperator.Like:
305
            return self._like_operator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
306
        if el.FilterOperator == ua.FilterOperator.Not:
307
            return not self._eval_op(ops[0], event)
308
        if el.FilterOperator == ua.FilterOperator.Between:
309
            return self._eval_op(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
310
        if el.FilterOperator == ua.FilterOperator.InList:
311
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
312
        if el.FilterOperator == ua.FilterOperator.And:
313
            self.elements(ops[0].Index)
314
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
315
        if el.FilterOperator == ua.FilterOperator.Or:
316
            return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
317
        if el.FilterOperator == ua.FilterOperator.Cast:
318
            self.logger.warn("Cast operand not implemented, assuming True")
319
            return True
320
        if el.FilterOperator == ua.FilterOperator.OfType:
321
            return event.EventType == self._eval_op(ops[0], event)
322
        # TODO: implement missing operators
323
        self.logger.warning("WhereClause not implemented for element: %s", el)
324
        raise NotImplementedError
325
326
    def _like_operator(self, string, pattern):
327
        raise NotImplementedError
328
329
    def _eval_op(self, op, event):
330
        # seems spec says we should return Null if issues
331
        if isinstance(op, ua.ElementOperand):
332
            return self._eval_el(op.Index, event)
333
        if isinstance(op, ua.AttributeOperand):
334
            if op.BrowsePath:
335
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
336
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
337
            # FIXME: check, this is probably broken
338
        if isinstance(op, ua.SimpleAttributeOperand):
339
            if op.BrowsePath:
340
                # we only support depth of 1
341
                return getattr(event, op.BrowsePath[0].Name)
342
            # TODO: write code for index range.... but doe it make any sense
343
            return self._aspace.read_attribute_value(event.EventType, op.AttributeId).Value.Value
344
        if isinstance(op, ua.LiteralOperand):
345
            return op.Value.Value
346
        self.logger.warning("Where clause element % is not of a known type", op)
347
        raise NotImplementedError
348