Completed
Pull Request — master (#161)
by Denis
03:18
created

SubscriptionService   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 90
Duplicated Lines 24.44 %

Test Coverage

Coverage 68.83%

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 22
loc 90
ccs 53
cts 77
cp 0.6883
rs 10
wmc 30

9 Methods

Rating   Name   Duplication   Size   Complexity  
A create_subscription() 0 15 2
A republish() 0 6 3
A modify_monitored_items() 11 11 4
A create_monitored_items() 11 11 4
A delete_monitored_items() 0 9 4
A __init__() 0 7 1
A delete_subscriptions() 0 12 4
B publish() 0 5 5
A trigger_event() 0 4 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
server side implementation of subscription service
3
"""
4
5 1
from threading import RLock
6 1
import logging
7
8 1
from opcua import ua
9 1
from opcua.server.internal_subscription import InternalSubscription
10
11
12 1
class SubscriptionService(object):
13
14 1
    def __init__(self, loop, aspace):
15 1
        self.logger = logging.getLogger(__name__)
16 1
        self.loop = loop
17 1
        self.aspace = aspace
18 1
        self.subscriptions = {}
19 1
        self._sub_id_counter = 77
20 1
        self._lock = RLock()
21
22 1
    def create_subscription(self, params, callback):
23 1
        self.logger.info("create subscription with callback: %s", callback)
24 1
        result = ua.CreateSubscriptionResult()
25 1
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
26 1
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
27 1
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
28 1
        with self._lock:
29 1
            self._sub_id_counter += 1
30 1
            result.SubscriptionId = self._sub_id_counter
31
32 1
            sub = InternalSubscription(self, result, self.aspace, callback)
33 1
            sub.start()
34 1
            self.subscriptions[result.SubscriptionId] = sub
35
36 1
            return result
37
38 1
    def delete_subscriptions(self, ids):
39 1
        self.logger.info("delete subscriptions: %s", ids)
40 1
        res = []
41 1
        for i in ids:
42 1
            with self._lock:
43 1
                if not i in self.subscriptions:
44
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
45
                else:
46 1
                    sub = self.subscriptions.pop(i)
47 1
                    sub.stop()
48 1
                    res.append(ua.StatusCode())
49 1
        return res
50
51 1
    def publish(self, acks):
52 1
        self.logger.info("publish request with acks %s", acks)
53 1
        with self._lock:
54 1
            for subid, sub in self.subscriptions.items():
55 1
                sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
56
57 1 View Code Duplication
    def create_monitored_items(self, params):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
58 1
        self.logger.info("create monitored items")
59 1
        with self._lock:
60 1
            if not params.SubscriptionId in self.subscriptions:
61
                res = []
62
                for _ in params.ItemsToCreate:
63
                    response = ua.MonitoredItemCreateResult()
64
                    response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
65
                    res.append(response)
66
                return res
67 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
68
69 1 View Code Duplication
    def modify_monitored_items(self, params):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70
        self.logger.info("modify monitored items")
71
        with self._lock:
72
            if not params.SubscriptionId in self.subscriptions:
73
                res = []
74
                for _ in params.ItemsToModify:
75
                    result = ua.MonitoredItemModifyResult()
76
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
77
                    res.append(result)
78
                return res
79
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
80
81 1
    def delete_monitored_items(self, params):
82 1
        self.logger.info("delete monitored items")
83 1
        with self._lock:
84 1
            if not params.SubscriptionId in self.subscriptions:
85 1
                res = []
86 1
                for _ in params.MonitoredItemIds:
87 1
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
88 1
                return res
89 1
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(params.MonitoredItemIds)
90
91 1
    def republish(self, params):
92
        with self._lock:
93
            if not params.SubscriptionId in self.subscriptions:
94
                # what should I do?
95
                return ua.NotificationMessage()
96
            return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
97
98 1
    def trigger_event(self, event):
99
        with self._lock:
100
            for sub in self.subscriptions.values():
101
                sub.monitored_item_srv.trigger_event(event)
102