Passed
Push — master ( 1a9d89...1da59b )
by Olivier
02:36
created

SubscriptionService.modify_subscription()   A

Complexity

Conditions 3

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 9.933

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 14
ccs 1
cts 12
cp 0.0833
crap 9.933
rs 9.4285
1
"""
2
server side implementation of subscription service
3
"""
4
5 1
from threading import RLock
6 1
import logging
7
8 1
from opcua import ua
9 1
from opcua.common import utils
10 1
from opcua.server.internal_subscription import InternalSubscription
11
12
13 1
class SubscriptionService(object):
14
15 1
    def __init__(self, loop, aspace):
16 1
        self.logger = logging.getLogger(__name__)
17 1
        self.loop = loop
18 1
        self.aspace = aspace
19 1
        self.subscriptions = {}
20 1
        self._sub_id_counter = 77
21 1
        self._lock = RLock()
22
23 1
    def create_subscription(self, params, callback):
24 1
        self.logger.info("create subscription with callback: %s", callback)
25 1
        result = ua.CreateSubscriptionResult()
26 1
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
27 1
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
28 1
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
29 1
        with self._lock:
30 1
            self._sub_id_counter += 1
31 1
            result.SubscriptionId = self._sub_id_counter
32
33 1
            sub = InternalSubscription(self, result, self.aspace, callback)
34 1
            sub.start()
35 1
            self.subscriptions[result.SubscriptionId] = sub
36
37 1
            return result
38
39 1
    def modify_subscription(self, params, callback):
40
        # Requested params are ignored, result = params set during create_subscription.
41
        self.logger.info("modify subscription with callback: %s", callback)
42
        result = ua.ModifySubscriptionResult()
43
        try:
44
            with self._lock:
45
                sub = self.subscriptions[params.SubscriptionId]
46
                result.RevisedPublishingInterval = sub.data.RevisedPublishingInterval
47
                result.RevisedLifetimeCount = sub.data.RevisedLifetimeCount
48
                result.RevisedMaxKeepAliveCount = sub.data.RevisedMaxKeepAliveCount
49
50
                return result
51
        except KeyError:
52
            raise utils.ServiceError(ua.StatusCodes.BadSubscriptionIdInvalid)
53
54 1
    def delete_subscriptions(self, ids):
55 1
        self.logger.info("delete subscriptions: %s", ids)
56 1
        res = []
57 1 View Code Duplication
        for i in ids:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
58 1
            with self._lock:
59 1
                if i not in self.subscriptions:
60
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
61
                else:
62 1
                    sub = self.subscriptions.pop(i)
63 1
                    sub.stop()
64 1
                    res.append(ua.StatusCode())
65 1
        return res
66
67 1
    def publish(self, acks):
68 1
        self.logger.info("publish request with acks %s", acks)
69 1 View Code Duplication
        with self._lock:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70 1
            for subid, sub in self.subscriptions.items():
71 1
                sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
72
73 1
    def create_monitored_items(self, params):
74 1
        self.logger.info("create monitored items")
75 1
        with self._lock:
76 1
            if params.SubscriptionId not in self.subscriptions:
77
                res = []
78
                for _ in params.ItemsToCreate:
79
                    response = ua.MonitoredItemCreateResult()
80
                    response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
81 View Code Duplication
                    res.append(response)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
82
                return res
83 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
84
85 1
    def modify_monitored_items(self, params):
86
        self.logger.info("modify monitored items")
87
        with self._lock:
88
            if params.SubscriptionId not in self.subscriptions:
89
                res = []
90
                for _ in params.ItemsToModify:
91
                    result = ua.MonitoredItemModifyResult()
92
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
93
                    res.append(result)
94
                return res
95
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
96
97 1
    def delete_monitored_items(self, params):
98 1
        self.logger.info("delete monitored items")
99 1
        with self._lock:
100 1
            if params.SubscriptionId not in self.subscriptions:
101 1
                res = []
102 1
                for _ in params.MonitoredItemIds:
103 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
104 1
                return res
105 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
106
                params.MonitoredItemIds)
107
108 1
    def republish(self, params):
109
        with self._lock:
110
            if params.SubscriptionId not in self.subscriptions:
111
                # TODO: what should I do?
112
                return ua.NotificationMessage()
113
            return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
114
115 1
    def trigger_event(self, event):
116 1
        with self._lock:
117 1
            for sub in self.subscriptions.values():
118
                sub.monitored_item_srv.trigger_event(event)
119