Completed
Pull Request — master (#638)
by
unknown
04:15
created

SubscriptionService.set_loop()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""
2
server side implementation of subscription service
3
"""
4
5 1
from threading import RLock
6 1
import logging
7
8 1
from opcua import ua
9 1
from opcua.common import utils
10 1
from opcua.server.internal_subscription import InternalSubscription
11
12
13 1
class SubscriptionService(object):
14
15 1
    def __init__(self, aspace):
16 1
        self.logger = logging.getLogger(__name__)
17 1
        self.loop = None
18 1
        self.aspace = aspace
19 1
        self.subscriptions = {}
20 1
        self._sub_id_counter = 77
21 1
        self._lock = RLock()
22
23 1
    def set_loop(self, loop):
24 1
        self.loop = loop
25
26 1
    def create_subscription(self, params, callback):
27 1
        self.logger.info("create subscription with callback: %s", callback)
28 1
        result = ua.CreateSubscriptionResult()
29 1
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
30 1
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
31 1
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
32 1
        with self._lock:
33 1
            self._sub_id_counter += 1
34 1
            result.SubscriptionId = self._sub_id_counter
35
36 1
            sub = InternalSubscription(self, result, self.aspace, callback)
37 1
            sub.start()
38 1
            self.subscriptions[result.SubscriptionId] = sub
39
40 1
            return result
41
42 1
    def modify_subscription(self, params, callback):
43
        # Requested params are ignored, result = params set during create_subscription.
44
        self.logger.info("modify subscription with callback: %s", callback)
45
        result = ua.ModifySubscriptionResult()
46
        try:
47
            with self._lock:
48
                sub = self.subscriptions[params.SubscriptionId]
49
                result.RevisedPublishingInterval = sub.data.RevisedPublishingInterval
50
                result.RevisedLifetimeCount = sub.data.RevisedLifetimeCount
51
                result.RevisedMaxKeepAliveCount = sub.data.RevisedMaxKeepAliveCount
52
53
                return result
54
        except KeyError:
55
            raise utils.ServiceError(ua.StatusCodes.BadSubscriptionIdInvalid)
56
57 1 View Code Duplication
    def delete_subscriptions(self, ids):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
58 1
        self.logger.info("delete subscriptions: %s", ids)
59 1
        res = []
60 1
        for i in ids:
61 1
            with self._lock:
62 1
                if i not in self.subscriptions:
63
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
64
                else:
65 1
                    sub = self.subscriptions.pop(i)
66 1
                    sub.stop()
67 1
                    res.append(ua.StatusCode())
68 1
        return res
69 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70 1
    def publish(self, acks):
71 1
        self.logger.info("publish request with acks %s", acks)
72 1
        with self._lock:
73 1
            for subid, sub in self.subscriptions.items():
74 1
                sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
75
76 1
    def create_monitored_items(self, params):
77 1
        self.logger.info("create monitored items")
78 1
        with self._lock:
79 1
            if params.SubscriptionId not in self.subscriptions:
80
                res = []
81 View Code Duplication
                for _ in params.ItemsToCreate:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
82
                    response = ua.MonitoredItemCreateResult()
83
                    response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
84
                    res.append(response)
85
                return res
86 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
87
88 1
    def modify_monitored_items(self, params):
89
        self.logger.info("modify monitored items")
90
        with self._lock:
91
            if params.SubscriptionId not in self.subscriptions:
92
                res = []
93
                for _ in params.ItemsToModify:
94
                    result = ua.MonitoredItemModifyResult()
95
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
96
                    res.append(result)
97
                return res
98
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
99
100 1
    def delete_monitored_items(self, params):
101 1
        self.logger.info("delete monitored items")
102 1
        with self._lock:
103 1
            if params.SubscriptionId not in self.subscriptions:
104 1
                res = []
105 1
                for _ in params.MonitoredItemIds:
106 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
107 1
                return res
108 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
109
                params.MonitoredItemIds)
110
111 1
    def republish(self, params):
112
        with self._lock:
113
            if params.SubscriptionId not in self.subscriptions:
114
                # TODO: what should I do?
115
                return ua.NotificationMessage()
116
            return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
117
118 1
    def trigger_event(self, event):
119 1
        with self._lock:
120 1
            for sub in self.subscriptions.values():
121
                sub.monitored_item_srv.trigger_event(event)
122