Passed
Push — master ( 98f6af...5000a8 )
by Olivier
02:30
created

asyncua.sync   F

Complexity

Total Complexity 102

Size/Duplication

Total Lines 419
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 314
dl 0
loc 419
rs 2
c 0
b 0
f 0
wmc 102

81 Methods

Rating   Name   Duplication   Size   Complexity  
A Node.get_variables() 0 3 1
A Node.add_property() 0 3 1
A Node.get_node_class() 0 3 1
A ThreadLoop.__enter__() 0 3 1
A Server.start() 0 3 1
A Node.get_browse_name() 0 3 1
A Node.__hash__() 0 2 1
A Client.disconnect() 0 9 3
A Subscription.unsubscribe() 0 3 1
A Node.get_attributes() 0 3 1
A Node.get_children_descriptions() 0 8 1
A Client.get_node() 0 2 1
A Subscription.create_monitored_items() 0 3 1
A Client.__exit__() 0 2 1
A Node.set_writable() 0 3 1
A Server.__str__() 0 2 1
A Server.__enter__() 0 3 1
A Server.link_method() 0 2 1
A Server.set_server_name() 0 2 1
A Node.add_folder() 0 3 1
A Client.__enter__() 0 3 1
A Server.get_namespace_index() 0 3 1
A Client.create_subscription() 0 4 1
A Node.write_value() 0 3 1
A Server.stop() 0 4 2
A Client.connect() 0 3 1
A Server.set_endpoint() 0 2 1
A Node.add_object_type() 0 3 1
A Node.add_object() 0 3 1
A Subscription.delete() 0 3 1
A Server.disable_clock() 0 2 1
A Server.get_event_generator() 0 3 1
A Server.register_namespace() 0 3 1
A Server.__exit__() 0 2 1
A _SubHandler.event_notification() 0 2 1
A Subscription.subscribe_events() 0 9 1
A Client.connect_and_get_server_endpoints() 0 3 1
A Node.get_child() 0 3 1
A ThreadLoop.stop() 0 4 1
A Node.__str__() 0 2 1
A Server.__init__() 0 10 2
A EventGenerator.trigger() 0 2 1
A Node.__ne__() 0 2 1
A Node.get_description() 0 3 1
A ThreadLoop.run() 0 5 1
A Node.add_variable() 0 3 1
A Server.set_attribute_value() 0 2 1
A Client.load_enums() 0 3 1
A Shortcuts.__init__() 0 5 2
A Client.get_namespace_index() 0 3 1
A Node.__eq__() 0 2 1
A Server.load_type_definitions() 0 3 1
A Server.import_xml() 0 3 1
A Node.get_display_name() 0 3 1
A Server.set_security_policy() 0 2 1
A EventGenerator.__init__() 0 2 1
A EventGenerator.event() 0 3 1
A ThreadLoop.start() 0 4 2
A Subscription.__init__() 0 3 1
A ThreadLoop._notify_start() 0 3 2
A Node.read_value() 0 3 1
A Node.call_method() 0 3 1
A Server.load_enums() 0 3 1
A _SubHandler.datachange_notification() 0 2 1
A Client.set_security() 0 3 1
A Client.__init__() 0 9 2
A Node.__init__() 0 3 1
A Node.set_modelling_rule() 0 3 1
A Client.__str__() 0 2 1
A Subscription.subscribe_data_change() 0 3 1
A Client.load_type_definitions() 0 3 1
A ThreadLoop.__exit__() 0 2 1
A ThreadLoop.post() 0 5 3
A Node.get_path() 0 3 1
A Node.get_references() 0 9 1
A Node.get_children() 0 5 1
A _SubHandler.__init__() 0 3 1
A ThreadLoop.__init__() 0 4 1
A Server.get_node() 0 2 1
A Node.add_method() 0 3 1
A Node.nodeid() 0 3 1

1 Function

Rating   Name   Duplication   Size   Complexity  
C syncmethod() 0 22 11

How to fix   Complexity   

Complexity

Complex classes like asyncua.sync often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from concurrent.futures import CancelledError
6
from threading import Thread, Condition
7
import logging
8
9
from asyncua import ua
10
from asyncua import client
11
from asyncua import server
12
from asyncua.common import node
13
from asyncua.common import subscription, shortcuts
14
15
logger = logging.getLogger(__name__)
16
17
18
class ThreadLoopNotRunning(Exception):
19
    pass
20
21
22
class ThreadLoop(Thread):
23
    def __init__(self):
24
        Thread.__init__(self)
25
        self.loop = None
26
        self._cond = Condition()
27
28
    def start(self):
29
        with self._cond:
30
            Thread.start(self)
31
            self._cond.wait()
32
33
    def run(self):
34
        self.loop = asyncio.new_event_loop()
35
        logger.debug("Threadloop: %s", self.loop)
36
        self.loop.call_soon_threadsafe(self._notify_start)
37
        self.loop.run_forever()
38
39
    def _notify_start(self):
40
        with self._cond:
41
            self._cond.notify_all()
42
43
    def stop(self):
44
        self.loop.call_soon_threadsafe(self.loop.stop)
45
        self.join()
46
        self.loop.close()
47
48
    def post(self, coro):
49
        if not self.loop or not self.loop.is_running():
50
            raise ThreadLoopNotRunning(f"could not post {coro}")
51
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
52
        return futur.result()
53
54
    def __enter__(self):
55
        self.start()
56
        return self
57
58
    def __exit__(self, exc_t, exc_v, trace):
59
        self.stop()
60
61
62
def syncmethod(func):
63
    def wrapper(self, *args, **kwargs):
64
        args = list(args)  #FIXME: might be very inefficient...
65
        for idx, arg in enumerate(args):
66
            if isinstance(arg, Node):
67
                args[idx] = arg.aio_obj
68
        for k, v in kwargs.items():
69
            if isinstance(v, Node):
70
                kwargs[k] = v.aio_obj
71
        aio_func = getattr(self.aio_obj, func.__name__)
72
        result = self.tloop.post(aio_func(*args, **kwargs))
73
        if isinstance(result, node.Node):
74
            return Node(self.tloop, result)
75
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
76
            return [Node(self.tloop, i) for i in result]
77
        if isinstance(result, server.event_generator.EventGenerator):
78
            return EventGenerator(result)
79
        if isinstance(result, subscription.Subscription):
80
            return Subscription(self.tloop, result)
81
        return result
82
83
    return wrapper
84
85
86
class _SubHandler:
87
    def __init__(self, tloop, sync_handler):
88
        self.tloop = tloop
89
        self.sync_handler = sync_handler
90
91
    def datachange_notification(self, node, val, data):
92
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
93
94
    def event_notification(self, event):
95
        self.sync_handler.event_notification(event)
96
97
98
class Client:
99
    def __init__(self, url: str, timeout: int = 4, tloop=None):
100
        self.tloop = tloop
101
        self.close_tloop = False
102
        if not self.tloop:
103
            self.tloop = ThreadLoop()
104
            self.tloop.start()
105
            self.close_tloop = True
106
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
107
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
108
109
    def __str__(self):
110
        return "Sync" + self.aio_obj.__str__()
111
    __repr__ = __str__
112
113
    @syncmethod
114
    def connect(self):
115
        pass
116
117
    def disconnect(self):
118
        try:
119
            self.tloop.post(self.aio_obj.disconnect())
120
            # an exception is expected, if it is not raise log a warning
121
            logger.warning("Disconnect did not raise expected CancelledError.")
122
        except CancelledError:
123
            pass
124
        if self.close_tloop:
125
            self.tloop.stop()
126
127
    @syncmethod
128
    def load_type_definitions(self, nodes=None):
129
        pass
130
131
    @syncmethod
132
    def set_security(self):
133
        pass
134
135
    @syncmethod
136
    def load_enums(self):
137
        pass
138
139
    def create_subscription(self, period, handler):
140
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
141
        aio_sub = self.tloop.post(coro)
142
        return Subscription(self.tloop, aio_sub)
143
144
    @syncmethod
145
    def get_namespace_index(self, url):
146
        pass
147
148
    def get_node(self, nodeid):
149
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
150
151
    @syncmethod
152
    def connect_and_get_server_endpoints(self):
153
        pass
154
155
    def __enter__(self):
156
        self.connect()
157
        return self
158
159
    def __exit__(self, exc_type, exc_value, traceback):
160
        self.disconnect()
161
162
163
class Shortcuts:
164
    def __init__(self, tloop, aio_server):
165
        self.tloop = tloop
166
        self.aio_obj = shortcuts.Shortcuts(aio_server)
167
        for k, v in self.aio_obj.__dict__.items():
168
            setattr(self, k, Node(self.tloop, v))
169
170
171
class Server:
172
    def __init__(self, shelf_file=None, tloop=None):
173
        self.tloop = tloop
174
        self.close_tloop = False
175
        if not self.tloop:
176
            self.tloop = ThreadLoop()
177
            self.tloop.start()
178
            self.close_tloop = True
179
        self.aio_obj = server.Server(loop=self.tloop.loop)
180
        self.tloop.post(self.aio_obj.init(shelf_file))
181
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
182
183
    def __str__(self):
184
        return "Sync" + self.aio_obj.__str__()
185
    __repr__ = __str__
186
187
    def __enter__(self):
188
        self.start()
189
        return self
190
191
    def __exit__(self, exc_type, exc_value, traceback):
192
        self.stop()
193
194
    def set_endpoint(self, url):
195
        return self.aio_obj.set_endpoint(url)
196
197
    def set_server_name(self, name):
198
        return self.aio_obj.set_server_name(name)
199
200
    def set_security_policy(self, security_policy):
201
        return self.aio_obj.set_security_policy(security_policy)
202
203
    def disable_clock(self, boolean):
204
        return self.aio_obj.disable_clock(boolean)
205
206
    @syncmethod
207
    def register_namespace(self, url):
208
        return self.aio_obj.register_namespace(url)
209
210
    @syncmethod
211
    def start(self):
212
        pass
213
214
    def stop(self):
215
        self.tloop.post(self.aio_obj.stop())
216
        if self.close_tloop:
217
            self.tloop.stop()
218
219
    def link_method(self, node, callback):
220
        return self.aio_obj.link_method(node, callback)
221
222
    @syncmethod
223
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
224
        pass
225
226
    def get_node(self, nodeid):
227
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
228
229
    @syncmethod
230
    def import_xml(self, path=None, xmlstring=None):
231
        pass
232
233
    @syncmethod
234
    def get_namespace_index(self, url):
235
        pass
236
237
    @syncmethod
238
    def load_enums(self):
239
        pass
240
241
    @syncmethod
242
    def load_type_definitions(self):
243
        pass
244
245
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
246
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
247
248
249
class EventGenerator:
250
    def __init__(self, aio_evgen):
251
        self.aio_obj = aio_evgen
252
253
    @property
254
    def event(self):
255
        return self.aio_obj.event
256
257
    def trigger(self, time=None, message=None):
258
        return self.aio_obj.trigger(time, message)
259
260
261
class Node:
262
    def __init__(self, tloop, aio_node):
263
        self.aio_obj = aio_node
264
        self.tloop = tloop
265
266
    def __eq__(self, other):
267
        return self.aio_obj == other.aio_obj
268
269
    def __ne__(self, other):
270
        return not self.__eq__(other)
271
272
    def __str__(self):
273
        return "Sync" + self.aio_obj.__str__()
274
275
    __repr__ = __str__
276
277
    def __hash__(self):
278
        return self.aio_obj.__hash__()
279
280
    @property
281
    def nodeid(self):
282
        return self.aio_obj.nodeid
283
284
    @syncmethod
285
    def get_browse_name(self):
286
        pass
287
288
    @syncmethod
289
    def get_display_name(self):
290
        pass
291
292
    @syncmethod
293
    def get_children(
294
        self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified
295
    ):
296
        pass
297
298
    @syncmethod
299
    def get_children_descriptions(
300
        self,
301
        refs=ua.ObjectIds.HierarchicalReferences,
302
        nodeclassmask=ua.NodeClass.Unspecified,
303
        includesubtypes=True,
304
    ):
305
        pass
306
307
    @syncmethod
308
    def get_child(self, path):
309
        pass
310
311
    @syncmethod
312
    def set_modelling_rule(self, mandatory: bool):
313
        pass
314
315
    @syncmethod
316
    def add_variable(self, ns, name, val):
317
        pass
318
319
    @syncmethod
320
    def add_property(self, ns, name, val):
321
        pass
322
323
    @syncmethod
324
    def add_object(self, ns, name):
325
        pass
326
327
    @syncmethod
328
    def add_object_type(self, ns, name):
329
        pass
330
331
    @syncmethod
332
    def add_folder(self, ns, name):
333
        pass
334
335
    @syncmethod
336
    def add_method(self, *args):
337
        pass
338
339
    @syncmethod
340
    def set_writable(self, writable=True):
341
        pass
342
343
    @syncmethod
344
    def write_value(self, val):
345
        pass
346
347
    set_value = write_value  # legacy
348
349
    @syncmethod
350
    def read_value(self):
351
        pass
352
353
    get_value = read_value  # legacy
354
355
    @syncmethod
356
    def call_method(self, methodid, *args):
357
        pass
358
359
    @syncmethod
360
    def get_references(
361
        self,
362
        refs=ua.ObjectIds.References,
363
        direction=ua.BrowseDirection.Both,
364
        nodeclassmask=ua.NodeClass.Unspecified,
365
        includesubtypes=True,
366
    ):
367
        pass
368
369
    @syncmethod
370
    def get_description(self):
371
        pass
372
373
    @syncmethod
374
    def get_variables(self):
375
        pass
376
377
    @syncmethod
378
    def get_path(self):
379
        pass
380
381
    @syncmethod
382
    def get_node_class(self):
383
        pass
384
385
    @syncmethod
386
    def get_attributes(self):
387
        pass
388
389
class Subscription:
390
    def __init__(self, tloop, sub):
391
        self.tloop = tloop
392
        self.aio_obj = sub
393
394
    @syncmethod
395
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
396
        pass
397
398
    @syncmethod
399
    def subscribe_events(
400
        self,
401
        sourcenode=ua.ObjectIds.Server,
402
        evtypes=ua.ObjectIds.BaseEventType,
403
        evfilter=None,
404
        queuesize=0,
405
    ):
406
        pass
407
408
    @syncmethod
409
    def unsubscribe(self, handle):
410
        pass
411
412
    @syncmethod
413
    async def create_monitored_items(self, monitored_items):
414
        pass
415
416
    @syncmethod
417
    def delete(self):
418
        pass
419