Passed
Push — master ( 86b214...45d609 )
by Olivier
02:41
created

asyncua.sync   F

Complexity

Total Complexity 96

Size/Duplication

Total Lines 393
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 293
dl 0
loc 393
rs 2
c 0
b 0
f 0
wmc 96

76 Methods

Rating   Name   Duplication   Size   Complexity  
A Client.get_node() 0 2 1
A Client.__exit__() 0 2 1
A Client.__enter__() 0 3 1
A Client.get_namespace_index() 0 3 1
A ThreadLoop.start() 0 4 2
A ThreadLoop.__init__() 0 4 1
A ThreadLoop.__enter__() 0 3 1
A Server.start() 0 3 1
A Client.disconnect() 0 4 2
A Server.__str__() 0 2 1
A Server.__enter__() 0 3 1
A Server.link_method() 0 2 1
A Server.set_server_name() 0 2 1
A Server.get_namespace_index() 0 3 1
A Client.create_subscription() 0 4 1
A Server.stop() 0 4 2
A Client.connect() 0 3 1
A Server.set_endpoint() 0 2 1
A Server.disable_clock() 0 2 1
A Server.get_event_generator() 0 3 1
A Server.register_namespace() 0 3 1
A Server.__exit__() 0 2 1
A _SubHandler.event_notification() 0 2 1
A ThreadLoop.stop() 0 2 1
A Server.__init__() 0 10 2
A EventGenerator.trigger() 0 2 1
A ThreadLoop.run() 0 5 1
A Server.set_attribute_value() 0 2 1
A Client.load_enums() 0 3 1
A Shortcuts.__init__() 0 5 2
A Server.load_type_definitions() 0 3 1
A Server.import_xml() 0 3 1
A Server.set_security_policy() 0 2 1
A EventGenerator.__init__() 0 2 1
A EventGenerator.event() 0 3 1
A ThreadLoop._notify_start() 0 3 2
A Server.load_enums() 0 3 1
A _SubHandler.datachange_notification() 0 2 1
A Client.__init__() 0 9 2
A Node.__init__() 0 3 1
A Client.__str__() 0 2 1
A Client.load_type_definitions() 0 3 1
A ThreadLoop.__exit__() 0 3 1
A ThreadLoop.post() 0 5 3
A _SubHandler.__init__() 0 3 1
A Server.get_node() 0 2 1
A Node.get_variables() 0 3 1
A Node.add_property() 0 3 1
A Node.get_browse_name() 0 3 1
A Node.__hash__() 0 2 1
A Subscription.unsubscribe() 0 3 1
A Node.get_children_descriptions() 0 8 1
A Subscription.create_monitored_items() 0 3 1
A Node.set_writable() 0 3 1
A Node.add_folder() 0 3 1
A Node.write_value() 0 3 1
A Node.add_object_type() 0 3 1
A Node.add_object() 0 3 1
A Subscription.delete() 0 3 1
A Subscription.subscribe_events() 0 9 1
A Node.get_child() 0 3 1
A Node.__str__() 0 2 1
A Node.__ne__() 0 2 1
A Node.get_description() 0 3 1
A Node.add_variable() 0 3 1
A Node.__eq__() 0 2 1
A Node.get_display_name() 0 3 1
A Subscription.__init__() 0 3 1
A Node.read_value() 0 3 1
A Node.call_method() 0 3 1
A Node.set_modelling_rule() 0 3 1
A Subscription.subscribe_data_change() 0 3 1
A Node.get_references() 0 9 1
A Node.get_children() 0 5 1
A Node.add_method() 0 3 1
A Node.nodeid() 0 3 1

1 Function

Rating   Name   Duplication   Size   Complexity  
C syncmethod() 0 22 11

How to fix   Complexity   

Complexity

Complex classes like asyncua.sync often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from threading import Thread, Condition
6
import logging
7
8
from asyncua import ua
9
from asyncua import client
10
from asyncua import server
11
from asyncua.common import node
12
from asyncua.common import subscription, shortcuts
13
14
logger = logging.getLogger(__name__)
15
16
17
class ThreadLoopNotRunning(Exception):
18
    pass
19
20
21
class ThreadLoop(Thread):
22
    def __init__(self):
23
        Thread.__init__(self)
24
        self.loop = None
25
        self._cond = Condition()
26
27
    def start(self):
28
        with self._cond:
29
            Thread.start(self)
30
            self._cond.wait()
31
32
    def run(self):
33
        self.loop = asyncio.new_event_loop()
34
        logger.debug("Threadloop: %s", self.loop)
35
        self.loop.call_soon_threadsafe(self._notify_start)
36
        self.loop.run_forever()
37
38
    def _notify_start(self):
39
        with self._cond:
40
            self._cond.notify_all()
41
42
    def stop(self):
43
        self.loop.call_soon_threadsafe(self.loop.stop)
44
45
    def post(self, coro):
46
        if not self.loop or not self.loop.is_running():
47
            raise ThreadLoopNotRunning(f"could not post {coro}")
48
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
49
        return futur.result()
50
51
    def __enter__(self):
52
        self.start()
53
        return self
54
55
    def __exit__(self, exc_t, exc_v, trace):
56
        self.stop()
57
        self.join()
58
59
60
def syncmethod(func):
61
    def wrapper(self, *args, **kwargs):
62
        args = list(args)  #FIXME: might be very inefficient...
63
        for idx, arg in enumerate(args):
64
            if isinstance(arg, Node):
65
                args[idx] = arg.aio_obj
66
        for k, v in kwargs.items():
67
            if isinstance(v, Node):
68
                kwargs[k] = v.aio_obj
69
        aio_func = getattr(self.aio_obj, func.__name__)
70
        result = self.tloop.post(aio_func(*args, **kwargs))
71
        if isinstance(result, node.Node):
72
            return Node(self.tloop, result)
73
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
74
            return [Node(self.tloop, i) for i in result]
75
        if isinstance(result, server.event_generator.EventGenerator):
76
            return EventGenerator(result)
77
        if isinstance(result, subscription.Subscription):
78
            return Subscription(self.tloop, result)
79
        return result
80
81
    return wrapper
82
83
84
class _SubHandler:
85
    def __init__(self, tloop, sync_handler):
86
        self.tloop = tloop
87
        self.sync_handler = sync_handler
88
89
    def datachange_notification(self, node, val, data):
90
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
91
92
    def event_notification(self, event):
93
        self.sync_handler.event_notification(event)
94
95
96
class Client:
97
    def __init__(self, url: str, timeout: int = 4, tloop=None):
98
        self.tloop = tloop
99
        self.close_tloop = False
100
        if not self.tloop:
101
            self.tloop = ThreadLoop()
102
            self.tloop.start()
103
            self.close_tloop = True
104
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
105
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
106
107
    def __str__(self):
108
        return "Sync" + self.aio_obj.__str__()
109
    __repr__ = __str__
110
111
    @syncmethod
112
    def connect(self):
113
        pass
114
115
    def disconnect(self):
116
        self.tloop.post(self.aio_obj.disconnect())
117
        if self.close_tloop:
118
            self.tloop.stop()
119
120
    @syncmethod
121
    def load_type_definitions(self, nodes=None):
122
        pass
123
124
    @syncmethod
125
    def load_enums(self):
126
        pass
127
128
    def create_subscription(self, period, handler):
129
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
130
        aio_sub = self.tloop.post(coro)
131
        return Subscription(self.tloop, aio_sub)
132
133
    @syncmethod
134
    def get_namespace_index(self, url):
135
        pass
136
137
    def get_node(self, nodeid):
138
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
139
140
    def __enter__(self):
141
        self.connect()
142
        return self
143
144
    def __exit__(self, exc_type, exc_value, traceback):
145
        self.disconnect()
146
147
148
class Shortcuts:
149
    def __init__(self, tloop, aio_server):
150
        self.tloop = tloop
151
        self.aio_obj = shortcuts.Shortcuts(aio_server)
152
        for k, v in self.aio_obj.__dict__.items():
153
            setattr(self, k, Node(self.tloop, v))
154
155
156
class Server:
157
    def __init__(self, shelf_file=None, tloop=None):
158
        self.tloop = tloop
159
        self.close_tloop = False
160
        if not self.tloop:
161
            self.tloop = ThreadLoop()
162
            self.tloop.start()
163
            self.close_tloop = True
164
        self.aio_obj = server.Server(loop=self.tloop.loop)
165
        self.tloop.post(self.aio_obj.init(shelf_file))
166
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
167
168
    def __str__(self):
169
        return "Sync" + self.aio_obj.__str__()
170
    __repr__ = __str__
171
172
    def __enter__(self):
173
        self.start()
174
        return self
175
176
    def __exit__(self, exc_type, exc_value, traceback):
177
        self.stop()
178
179
    def set_endpoint(self, url):
180
        return self.aio_obj.set_endpoint(url)
181
182
    def set_server_name(self, name):
183
        return self.aio_obj.set_server_name(name)
184
185
    def set_security_policy(self, security_policy):
186
        return self.aio_obj.set_security_policy(security_policy)
187
188
    def disable_clock(self, boolean):
189
        return self.aio_obj.disable_clock(boolean)
190
191
    @syncmethod
192
    def register_namespace(self, url):
193
        return self.aio_obj.register_namespace(url)
194
195
    @syncmethod
196
    def start(self):
197
        pass
198
199
    def stop(self):
200
        self.tloop.post(self.aio_obj.stop())
201
        if self.close_tloop:
202
            self.tloop.stop()
203
204
    def link_method(self, node, callback):
205
        return self.aio_obj.link_method(node, callback)
206
207
    @syncmethod
208
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
209
        pass
210
211
    def get_node(self, nodeid):
212
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
213
214
    @syncmethod
215
    def import_xml(self, path=None, xmlstring=None):
216
        pass
217
218
    @syncmethod
219
    def get_namespace_index(self, url):
220
        pass
221
222
    @syncmethod
223
    def load_enums(self):
224
        pass
225
226
    @syncmethod
227
    def load_type_definitions(self):
228
        pass
229
230
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
231
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
232
233
234
class EventGenerator:
235
    def __init__(self, aio_evgen):
236
        self.aio_obj = aio_evgen
237
238
    @property
239
    def event(self):
240
        return self.aio_obj.event
241
242
    def trigger(self, time=None, message=None):
243
        return self.aio_obj.trigger(time, message)
244
245
246
class Node:
247
    def __init__(self, tloop, aio_node):
248
        self.aio_obj = aio_node
249
        self.tloop = tloop
250
251
    def __eq__(self, other):
252
        return self.aio_obj == other.aio_obj
253
254
    def __ne__(self, other):
255
        return not self.__eq__(other)
256
257
    def __str__(self):
258
        return "Sync" + self.aio_obj.__str__()
259
260
    __repr__ = __str__
261
262
    def __hash__(self):
263
        return self.aio_obj.__hash__()
264
265
    @property
266
    def nodeid(self):
267
        return self.aio_obj.nodeid
268
269
    @syncmethod
270
    def get_browse_name(self):
271
        pass
272
273
    @syncmethod
274
    def get_display_name(self):
275
        pass
276
277
    @syncmethod
278
    def get_children(
279
        self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified
280
    ):
281
        pass
282
283
    @syncmethod
284
    def get_children_descriptions(
285
        self,
286
        refs=ua.ObjectIds.HierarchicalReferences,
287
        nodeclassmask=ua.NodeClass.Unspecified,
288
        includesubtypes=True,
289
    ):
290
        pass
291
292
    @syncmethod
293
    def get_child(self, path):
294
        pass
295
296
    @syncmethod
297
    def set_modelling_rule(self, mandatory: bool):
298
        pass
299
300
    @syncmethod
301
    def add_variable(self, ns, name, val):
302
        pass
303
304
    @syncmethod
305
    def add_property(self, ns, name, val):
306
        pass
307
308
    @syncmethod
309
    def add_object(self, ns, name):
310
        pass
311
312
    @syncmethod
313
    def add_object_type(self, ns, name):
314
        pass
315
316
    @syncmethod
317
    def add_folder(self, ns, name):
318
        pass
319
320
    @syncmethod
321
    def add_method(self, *args):
322
        pass
323
324
    @syncmethod
325
    def set_writable(self, writable=True):
326
        pass
327
328
    @syncmethod
329
    def write_value(self, val):
330
        pass
331
332
    set_value = write_value  # legacy
333
334
    @syncmethod
335
    def read_value(self):
336
        pass
337
338
    get_value = read_value  # legacy
339
340
    @syncmethod
341
    def call_method(self, methodid, *args):
342
        pass
343
344
    @syncmethod
345
    def get_references(
346
        self,
347
        refs=ua.ObjectIds.References,
348
        direction=ua.BrowseDirection.Both,
349
        nodeclassmask=ua.NodeClass.Unspecified,
350
        includesubtypes=True,
351
    ):
352
        pass
353
354
    @syncmethod
355
    def get_description(self):
356
        pass
357
358
    @syncmethod
359
    def get_variables(self):
360
        pass
361
362
363
class Subscription:
364
    def __init__(self, tloop, sub):
365
        self.tloop = tloop
366
        self.aio_obj = sub
367
368
    @syncmethod
369
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
370
        pass
371
372
    @syncmethod
373
    def subscribe_events(
374
        self,
375
        sourcenode=ua.ObjectIds.Server,
376
        evtypes=ua.ObjectIds.BaseEventType,
377
        evfilter=None,
378
        queuesize=0,
379
    ):
380
        pass
381
382
    @syncmethod
383
    def unsubscribe(self, handle):
384
        pass
385
386
    @syncmethod
387
    async def create_monitored_items(self, monitored_items):
388
        pass
389
390
    @syncmethod
391
    def delete(self):
392
        pass
393