Completed
Pull Request — master (#179)
by Olivier
01:57
created

MonitoredItemService._create_monitored_item()   D

Complexity

Conditions 9

Size

Total Lines 50

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 40
CRAP Score 9

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 9
c 2
b 0
f 0
dl 0
loc 50
ccs 40
cts 40
cp 1
crap 9
rs 4
1
"""
2
server side implementation of a subscription object
3
"""
4
5 1
from threading import RLock
6 1
import logging
7 1
import copy
8
9 1
from opcua import ua
10
11
12 1
class MonitoredItemData(object):
13
14 1
    def __init__(self):
15 1
        self.client_handle = None
16 1
        self.callback_handle = None
17 1
        self.monitored_item_id = None
18 1
        self.parameters = None
19 1
        self.mode = None
20 1
        self.mfilter = None
21
        self.where_clause_evaluator = None
22
23 1
24
class MonitoredItemService(object):
25
26
    """
27
    implement monitoreditem service for 1 subscription
28
    """
29 1
30 1
    def __init__(self, isub, aspace):
31 1
        self.logger = logging.getLogger(__name__ + "." + str(isub.data.SubscriptionId))
32 1
        self.isub = isub
33 1
        self.aspace = aspace
34 1
        self._lock = RLock()
35 1
        self._monitored_items = {}
36 1
        self._monitored_events = {}
37 1
        self._monitored_datachange = {}
38
        self._monitored_item_counter = 111
39 1
40 1
    def delete_all_monitored_items(self):
41
        self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
42 1
43 1
    def create_monitored_items(self, params):
44 1
        results = []
45 1
        for item in params.ItemsToCreate:
46 1
            results.append(self._create_monitored_item(item))
47
        return results
48 1
49
    def modify_monitored_items(self, params):
50
        results = []
51
        for item in params.ItemsToModify:
52
            results.append(self._modify_monitored_item(item))
53
        return results
54 1
55 1
    def trigger_datachange(self, handle, nodeid, attr):
56 1
        self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
57 1
        variant = self.aspace.get_attribute_value(nodeid, attr)
58
        self.datachange_callback(handle, variant)
59 1
60
    def _modify_monitored_item(self, params):
61
        with self._lock:
62
            for mdata in self._monitored_items.values():
63
                result = ua.MonitoredItemModifyResult()
64
                if mdata.monitored_item_id == params.MonitoredItemId:
65
                    self.isub.data.RevisedPublishingInterval = params.RequestedParameters.SamplingInterval
66
                    result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
67
                    result.RevisedQueueSize = params.RequestedParameters.QueueSize
68
                    result.FilterResult = params.RequestedParameters.Filter
69
                    mdata.parameters = result
70
                    return result
71
            result = ua.MonitoredItemModifyResult()
72
            result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
73
            return result
74 1
75 1
    def _create_monitored_item(self, params):
76 1
        with self._lock:
77 1
            result = ua.MonitoredItemCreateResult()
78 1
            result.RevisedSamplingInterval = self.isub.data.RevisedPublishingInterval
79 1
            result.RevisedQueueSize = params.RequestedParameters.QueueSize
80 1
            self._monitored_item_counter += 1
81 1
            result.MonitoredItemId = self._monitored_item_counter
82
            self.logger.debug("Creating MonitoredItem with id %s", result.MonitoredItemId)
83 1
84 1
            mdata = MonitoredItemData()
85 1
            mdata.parameters = result
86 1
            mdata.mode = params.MonitoringMode
87 1
            mdata.client_handle = params.RequestedParameters.ClientHandle
88 1
            mdata.mfilter = params.RequestedParameters.Filter
89
            mdata.monitored_item_id = result.MonitoredItemId
90 1
91
            self._monitored_items[result.MonitoredItemId] = mdata
92 1
93 1
            if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
94 1
                self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
95 1
                ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
96 1
                if ev_notify_byte is not None:
97 1
                    if ev_notify_byte & 1 == 0:
98
                        result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
99 1
                else:
100 1
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
101 1
                result.FilterResult = ua.EventFilterResult()
102 1
                for _ in params.RequestedParameters.Filter.SelectClauses:
103
                    result.FilterResult.SelectClauseResults.append(ua.StatusCode())
104 1
                # TODO: spec says we should check WhereClause here
105
                mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.mfilter.WhereClause)
106 1
                if not params.ItemToMonitor.NodeId in self._monitored_events:
107 1
                    self._monitored_events[params.ItemToMonitor.NodeId] = []
108 1
                self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
109 1
            else:
110 1
                self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
111 1
                result.FilterResult = params.RequestedParameters.Filter
112 1
                result.StatusCode, handle = self.aspace.add_datachange_callback(params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId, self.datachange_callback)
113
                self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
114 1
                mdata.callback_handle = handle
115
                self._monitored_datachange[handle] = result.MonitoredItemId
116 1
                if result.StatusCode.is_good():
117 1
                    # force data change event generation
118 1
                    self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
119
120 1
            if not result.StatusCode.is_good():
121
                del(self._monitored_items[result.MonitoredItemId])
122 1
                self._monitored_item_counter -= 1
123 1
124 1
            return result
125 1
126 1
    def delete_monitored_items(self, ids):
127 1
        self.logger.debug("delete monitored items %s", ids)
128 1
        with self._lock:
129
            results = []
130 1
            for mid in ids:
131 1
                results.append(self._delete_monitored_items(mid))
132 1
            return results
133 1
134 1
    def _delete_monitored_items(self, mid):
135 1
        if mid not in self._monitored_items:
136 1
            return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
137 1
        for k, v in self._monitored_events.items():
138 1
            if mid in v:
139 1
                v.remove(mid)
140 1
                if not v:
141 1
                    self._monitored_events.pop(k)
142 1
                break
143 1
        for k, v in self._monitored_datachange.items():
144
            if v == mid:
145 1
                self.aspace.delete_datachange_callback(k)
146 1
                self._monitored_datachange.pop(k)
147
                break
148
        self._monitored_items.pop(mid)
149
        return ua.StatusCode()
150 1
151 1
    def datachange_callback(self, handle, value, error=None):
152 1
        if error:
153 1
            self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'",
154 1
                             self, handle, error)
155 1
            self.trigger_statuschange(error)
156 1
        else:
157 1
            self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'",
158
                             self, handle, value.Value)
159 1
            event = ua.MonitoredItemNotification()
160 1
            with self._lock:
161 1
                mid = self._monitored_datachange[handle]
162 1
                mdata = self._monitored_items[mid]
163 1
                event.ClientHandle = mdata.client_handle
164 1
                event.Value = value
165 1
                self.isub.enqueue_datachange_event(mid, event, mdata.parameters.RevisedQueueSize)
166 1
167
    def trigger_event(self, event):
168
        with self._lock:
169 1
            if event.SourceNode not in self._monitored_events:
170 1
                self.logger.debug("%s has no subscription for events %s from node: %s",
171 1
                                  self, event, event.SourceNode)
172 1
                return False
173 1
            self.logger.debug("%s has subscription for events %s from node: %s",
174 1
                              self, event, event.SourceNode)
175
            mids = self._monitored_events[event.SourceNode]
176 1
            for mid in mids:
177 1
                self._trigger_event(event, mid)
178 1
179 1
    def _trigger_event(self, event, mid):
180 1
        if mid not in self._monitored_items:
181
            self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
182
            return
183
        mdata = self._monitored_items[mid]
184
        if not mdata.where_clause_evaluator.eval(event):
185
            self.logger.debug("Event does not fit WhereClause, not generating event", mid, event, self)
186 1
            return
187 1
        fieldlist = ua.EventFieldList()
188 1
        fieldlist.ClientHandle = mdata.client_handle
189 1
        fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event)
190
        self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize)
191
192 1
    def _get_event_fields(self, evfilter, event):
193
        fields = []
194 1
        for sattr in evfilter.SelectClauses:
195
            try:
196
                if not sattr.BrowsePath:
197
                    val = getattr(event, sattr.Attribute.name)
198 1
                    val = copy.deepcopy(val)
199
                    fields.append(ua.Variant(val))
200 1
                else:
201 1
                    name = sattr.BrowsePath[0].Name
202 1
                    val = getattr(event, name)
203 1
                    val = copy.deepcopy(val)
204 1
                    fields.append(ua.Variant(val))
205 1
            except AttributeError:
206 1
                fields.append(ua.Variant())
207 1
        return fields
208 1
209 1
    def trigger_statuschange(self, code):
210 1
        self.isub.enqueue_statuschange(code)
211 1
212 1
213 1
class InternalSubscription(object):
214 1
215 1
    def __init__(self, subservice, data, addressspace, callback):
216 1
        self.logger = logging.getLogger(__name__ + "." + str(data.SubscriptionId))
217 1
        self.aspace = addressspace
218
        self.subservice = subservice
219 1
        self.data = data
220
        self.callback = callback
221
        self.monitored_item_srv = MonitoredItemService(self, addressspace)
222 1
        self.task = None
223 1
        self._lock = RLock()
224 1
        self._triggered_datachanges = {}
225
        self._triggered_events = {}
226 1
        self._triggered_statuschanges = []
227 1
        self._notification_seq = 1
228 1
        self._not_acknowledged_results = {}
229 1
        self._startup = True
230
        self._keep_alive_count = 0
231 1
        self._publish_cycles_count = 0
232
        self._stopev = False
233 1
234 1
    def __str__(self):
235
        return "Subscription(id:{})".format(self.data.SubscriptionId)
236 1
237 1
    def start(self):
238 1
        self.logger.debug("starting subscription %s", self.data.SubscriptionId)
239 1
        self._subscription_loop()
240 1
241
    def stop(self):
242 1
        self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
243 1
        self._stopev = True
244 1
        self.monitored_item_srv.delete_all_monitored_items()
245 1
246 1
    def _subscription_loop(self):
247
        #self.logger.debug("%s loop", self)
248
        if not self._stopev:
249 1
            self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
250 1
251
    def _sub_loop(self):
252 1
        if self._stopev:
253 1
            return
254
        self.publish_results()
255
        self._subscription_loop()
256
257
    def has_published_results(self):
258 1
        with self._lock:
259 1
            if self._startup or self._triggered_datachanges or self._triggered_events:
260 1
                return True
261 1
            if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
262 1
                self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event", self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
263 1
                return True
264 1
            self._keep_alive_count += 1
265
            return False
266 1
267 1
    def publish_results(self):
268 1
        if self._publish_cycles_count > self.data.RevisedLifetimeCount:
269 1
            self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
270 1
            # FIXME this will never be send since we do not have publish request anyway
271 1
            self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
272 1
            self._stopev = True
273 1
        result = None
274 1
        with self._lock:
275 1
            if self.has_published_results():  # FIXME: should we pop a publish request here? or we do not care?
276 1
                self._publish_cycles_count += 1
277 1
                result = self._pop_publish_result()
278 1
        if result is not None:
279 1
            self.callback(result)
280 1
281
    def _pop_publish_result(self):
282 1
        result = ua.PublishResult()
283 1
        result.SubscriptionId = self.data.SubscriptionId
284 1
        self._pop_triggered_datachanges(result)
285 1
        self._pop_triggered_events(result)
286 1
        self._pop_triggered_statuschanges(result)
287 1
        self._keep_alive_count = 0
288 1
        self._startup = False
289
        result.NotificationMessage.SequenceNumber = self._notification_seq
290 1
        if len(result.NotificationMessage.NotificationData) != 0:
291 1
            self._notification_seq += 1
292 1
            self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
293 1
        result.MoreNotifications = False
294 1
        result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
295 1
        return result
296 1
297
    def _pop_triggered_datachanges(self, result):
298 1
        if self._triggered_datachanges:
299 1
            notif = ua.DataChangeNotification()
300
            notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
301
            self._triggered_datachanges = {}
302
            self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
303
            result.NotificationMessage.NotificationData.append(notif)
304
305 1
    def _pop_triggered_events(self, result):
306 1
        if self._triggered_events:
307 1
            notif = ua.EventNotificationList()
308 1
            notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
309 1
            self._triggered_events = {}
310 1
            result.NotificationMessage.NotificationData.append(notif)
311 1
            self.logger.debug("sending event notification with %s events", len(notif.Events))
312
313 1
    def _pop_triggered_statuschanges(self, result):
314
        if self._triggered_statuschanges:
315
            notif = ua.StatusChangeNotification()
316
            notif.Status = self._triggered_statuschanges.pop(0)
317
            result.NotificationMessage.NotificationData.append(notif)
318
            self.logger.debug("sending event notification %s", notif.Status)
319
320
    def publish(self, acks):
321
        self.logger.info("publish request with acks %s", acks)
322
        with self._lock:
323 1
            self._publish_cycles_count = 0
324 1
            for nb in acks:
325
                if nb in self._not_acknowledged_results:
326 1
                    self._not_acknowledged_results.pop(nb)
327 1
328
    def republish(self, nb):
329 1
        self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
330
        with self._lock:
331
            if nb in self._not_acknowledged_results:
332 1
                self.logger.info("re-publishing ack %s in subscription %s", nb, self)
333 1
                return self._not_acknowledged_results[nb].NotificationMessage
334 1
            else:
335 1
                self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
336 1
                return ua.NotificationMessage()
337
338
    def enqueue_datachange_event(self, mid, eventdata, maxsize):
339 1
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
340
341
    def enqueue_event(self, mid, eventdata, maxsize):
342
        self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
343
344
    def enqueue_statuschange(self, code):
345
        self._triggered_statuschanges.append(code)
346
347
    def _enqueue_event(self, mid, eventdata, size, queue):
348
        if mid not in queue:
349
            queue[mid] = [eventdata]
350
            return
351
        if size != 0:
352
            if len(queue[mid]) >= size:
353
                queue[mid].pop(0)
354
        queue[mid].append(eventdata)
355
356
357
class WhereClauseEvaluator(object):
358
    def __init__(self, logger, aspace, whereclause):
359
        self.logger = logger
360
        self.elements = whereclause.Elements
361
        self._aspace = aspace
362
363
    def eval(self, event):
364
        if not self.elements:
365
            return True
366
        # spec says we should only evaluate first element, which may use other elements
367
        try:
368
            res = self._eval_el(0, event)
369
        except Exception as ex:
370
            self.logger.warning("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event, ex)
371
            return False
372
        return res
373
374
    def _eval_el(self, index, event):
375
        el = self.elements[index]
376
        #ops = [self._eval_op(op, event) for op in el.FilterOperands]
377
        ops = el.FilterOperands  # just to make code more readable
378
        if el.FilterOperator == ua.FilterOperator.Equals:
379
            return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
380
        elif el.FilterOperator == ua.FilterOperator.IsNull:
381
            return self._eval_op(ops[0], event) is None  # FIXME: might be too strict
382
        elif el.FilterOperator == ua.FilterOperator.GreaterThan:
383
            return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
384
        elif el.FilterOperator == ua.FilterOperator.LessThan:
385
            return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
386
        elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
387
            return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
388
        elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
389
            return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
390
        elif el.FilterOperator == ua.FilterOperator.Like:
391
            return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
392
        elif el.FilterOperator == ua.FilterOperator.Not:
393
            return not self._eval_op(ops[0], event)
394
        elif el.FilterOperator == ua.FilterOperator.Between:
395
            return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
396
        elif el.FilterOperator == ua.FilterOperator.InList:
397
            return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
398
        elif el.FilterOperator == ua.FilterOperator.And:
399
            self.elements(ops[0].Index)
400
            return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
401
        elif el.FilterOperator == ua.FilterOperator.Or:
402
            return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
403
        elif el.FilterOperator == ua.FilterOperator.Cast:
404
            self.logger("Cast operand not implemented")
405
            raise NotImplementError
406
        else:
407
            # TODO: implement missing operators
408
            print("WhereClause not implemented for element: %s", el)
409
            raise NotImplementError
410
411
    def _like_operator(self, string, pattern):
412
        raise NotImplementError
413
414
    def _eval_op(self, op, event):
415
        # seems spec says we should return Null if issues
416
        if type(op) is ua.ElementOperand:
417
            el = self.elements[op.FilterOperands[0].Index]
418
            return self._eval_el(el)
419
        elif type(op) is ua.AttributeOperand:
420
            if op.BrowsePath:
421
                return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
422
            else:
423
                return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
424
            # FIXME: check, this is probably broken
425
        elif type(op) is ua.SimpleAttributeOperand:
426
            if op.BrowsePath:
427
                # we only support depth of 1
428
                return getattr(event, op.BrowsePath[0].Name)
429
            else:
430
                # TODO: write code for index range.... but doe it make any sense
431
                return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
432
        elif type(op) is ua.LiteralOperand:
433
            return op.Value.Value
434
        else:
435
            self.logger.warning("Where clause element % is not of a known type", el)
436
            raise NotImplementError
437
438
439
440