Completed
Pull Request — master (#161)
by Denis
03:18
created

SubscriptionService.__init__()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1
Metric Value
cc 1
dl 0
loc 7
rs 9.4285
ccs 7
cts 7
cp 1
crap 1
1
"""
2
server side implementation of subscription service
3
"""
4
5 1
from threading import RLock
6 1
import logging
7
8 1
from opcua import ua
9 1
from opcua.server.internal_subscription import InternalSubscription
10
11
12 1
class SubscriptionService(object):
13
14 1
    def __init__(self, loop, aspace):
15 1
        self.logger = logging.getLogger(__name__)
16 1
        self.loop = loop
17 1
        self.aspace = aspace
18 1
        self.subscriptions = {}
19 1
        self._sub_id_counter = 77
20 1
        self._lock = RLock()
21
22 1
    def create_subscription(self, params, callback):
23 1
        self.logger.info("create subscription with callback: %s", callback)
24 1
        result = ua.CreateSubscriptionResult()
25 1
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
26 1
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
27 1
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
28 1
        with self._lock:
29 1
            self._sub_id_counter += 1
30 1
            result.SubscriptionId = self._sub_id_counter
31
32 1
            sub = InternalSubscription(self, result, self.aspace, callback)
33 1
            sub.start()
34 1
            self.subscriptions[result.SubscriptionId] = sub
35
36 1
            return result
37
38 1
    def delete_subscriptions(self, ids):
39 1
        self.logger.info("delete subscriptions: %s", ids)
40 1
        res = []
41 1
        for i in ids:
42 1
            with self._lock:
43 1
                if not i in self.subscriptions:
44
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
45
                else:
46 1
                    sub = self.subscriptions.pop(i)
47 1
                    sub.stop()
48 1
                    res.append(ua.StatusCode())
49 1
        return res
50
51 1
    def publish(self, acks):
52 1
        self.logger.info("publish request with acks %s", acks)
53 1
        with self._lock:
54 1
            for subid, sub in self.subscriptions.items():
55 1
                sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
56
57 1 View Code Duplication
    def create_monitored_items(self, params):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
58 1
        self.logger.info("create monitored items")
59 1
        with self._lock:
60 1
            if not params.SubscriptionId in self.subscriptions:
61
                res = []
62
                for _ in params.ItemsToCreate:
63
                    response = ua.MonitoredItemCreateResult()
64
                    response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
65
                    res.append(response)
66
                return res
67 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
68
69 1 View Code Duplication
    def modify_monitored_items(self, params):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70
        self.logger.info("modify monitored items")
71
        with self._lock:
72
            if not params.SubscriptionId in self.subscriptions:
73
                res = []
74
                for _ in params.ItemsToModify:
75
                    result = ua.MonitoredItemModifyResult()
76
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
77
                    res.append(result)
78
                return res
79
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
80
81 1
    def delete_monitored_items(self, params):
82 1
        self.logger.info("delete monitored items")
83 1
        with self._lock:
84 1
            if not params.SubscriptionId in self.subscriptions:
85 1
                res = []
86 1
                for _ in params.MonitoredItemIds:
87 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
88 1
                return res
89 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(params.MonitoredItemIds)
90
91 1
    def republish(self, params):
92
        with self._lock:
93
            if not params.SubscriptionId in self.subscriptions:
94
                # what should I do?
95
                return ua.NotificationMessage()
96
            return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
97
98 1
    def trigger_event(self, event):
99
        with self._lock:
100
            for sub in self.subscriptions.values():
101
                sub.monitored_item_srv.trigger_event(event)
102