Passed
Push — master ( 8ac57c...99ee95 )
by Olivier
02:51
created

asyncua.sync.Client.__del__()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""
2
sync API of asyncua
3
"""
4
import asyncio
5
from concurrent.futures import CancelledError
6
from threading import Thread, Condition
7
import logging
8
9
from asyncua import ua
10
from asyncua import client
11
from asyncua import server
12
from asyncua.common import node
13
from asyncua.common import subscription, shortcuts
14
15
logger = logging.getLogger(__name__)
16
17
18
class ThreadLoopNotRunning(Exception):
19
    pass
20
21
22
class ThreadLoop(Thread):
23
    def __init__(self):
24
        Thread.__init__(self)
25
        self.loop = None
26
        self._cond = Condition()
27
28
    def start(self):
29
        with self._cond:
30
            Thread.start(self)
31
            self._cond.wait()
32
33
    def run(self):
34
        self.loop = asyncio.new_event_loop()
35
        logger.debug("Threadloop: %s", self.loop)
36
        self.loop.call_soon_threadsafe(self._notify_start)
37
        self.loop.run_forever()
38
39
    def _notify_start(self):
40
        with self._cond:
41
            self._cond.notify_all()
42
43
    def stop(self):
44
        self.loop.call_soon_threadsafe(self.loop.stop)
45
        self.join()
46
        self.loop.close()
47
48
    def post(self, coro):
49
        if not self.loop or not self.loop.is_running():
50
            raise ThreadLoopNotRunning(f"could not post {coro}")
51
        futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
52
        return futur.result()
53
54
    def __enter__(self):
55
        self.start()
56
        return self
57
58
    def __exit__(self, exc_t, exc_v, trace):
59
        self.stop()
60
61
62
def syncmethod(func):
63
    def wrapper(self, *args, **kwargs):
64
        args = list(args)  #FIXME: might be very inefficient...
65
        for idx, arg in enumerate(args):
66
            if isinstance(arg, Node):
67
                args[idx] = arg.aio_obj
68
        for k, v in kwargs.items():
69
            if isinstance(v, Node):
70
                kwargs[k] = v.aio_obj
71
        aio_func = getattr(self.aio_obj, func.__name__)
72
        result = self.tloop.post(aio_func(*args, **kwargs))
73
        if isinstance(result, node.Node):
74
            return Node(self.tloop, result)
75
        if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
76
            return [Node(self.tloop, i) for i in result]
77
        if isinstance(result, server.event_generator.EventGenerator):
78
            return EventGenerator(result)
79
        if isinstance(result, subscription.Subscription):
80
            return Subscription(self.tloop, result)
81
        return result
82
83
    return wrapper
84
85
86
class _SubHandler:
87
    def __init__(self, tloop, sync_handler):
88
        self.tloop = tloop
89
        self.sync_handler = sync_handler
90
91
    def datachange_notification(self, node, val, data):
92
        self.sync_handler.datachange_notification(Node(self.tloop, node), val, data)
93
94
    def event_notification(self, event):
95
        self.sync_handler.event_notification(event)
96
97
98
class Client:
99
    def __init__(self, url: str, timeout: int = 4, tloop=None):
100
        self.tloop = tloop
101
        self.close_tloop = False
102
        if not self.tloop:
103
            self.tloop = ThreadLoop()
104
            self.tloop.start()
105
            self.close_tloop = True
106
        self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
107
        self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
108
109
    def __str__(self):
110
        return "Sync" + self.aio_obj.__str__()
111
    __repr__ = __str__
112
113
    def __del__(self):
114
        """Delete method to ensure that the ThreadLoop is stopped."""
115
        if self.close_tloop:
116
            self.tloop.stop()
117
118
    @syncmethod
119
    def connect(self):
120
        pass
121
122
    def disconnect(self):
123
        try:
124
            self.tloop.post(self.aio_obj.disconnect())
125
            # an exception is expected, if it is not raise log a warning
126
            logger.warning("Disconnect did not raise expected CancelledError.")
127
        except CancelledError:
128
            pass
129
130
    @syncmethod
131
    def load_type_definitions(self, nodes=None):
132
        pass
133
134
    @syncmethod
135
    def set_security(self):
136
        pass
137
138
    @syncmethod
139
    def load_enums(self):
140
        pass
141
142
    def create_subscription(self, period, handler):
143
        coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler))
144
        aio_sub = self.tloop.post(coro)
145
        return Subscription(self.tloop, aio_sub)
146
147
    @syncmethod
148
    def get_namespace_index(self, url):
149
        pass
150
151
    def get_node(self, nodeid):
152
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
153
154
    @syncmethod
155
    def connect_and_get_server_endpoints(self):
156
        pass
157
158
    def __enter__(self):
159
        self.connect()
160
        return self
161
162
    def __exit__(self, exc_type, exc_value, traceback):
163
        self.disconnect()
164
165
166
class Shortcuts:
167
    def __init__(self, tloop, aio_server):
168
        self.tloop = tloop
169
        self.aio_obj = shortcuts.Shortcuts(aio_server)
170
        for k, v in self.aio_obj.__dict__.items():
171
            setattr(self, k, Node(self.tloop, v))
172
173
174
class Server:
175
    def __init__(self, shelf_file=None, tloop=None):
176
        self.tloop = tloop
177
        self.close_tloop = False
178
        if not self.tloop:
179
            self.tloop = ThreadLoop()
180
            self.tloop.start()
181
            self.close_tloop = True
182
        self.aio_obj = server.Server(loop=self.tloop.loop)
183
        self.tloop.post(self.aio_obj.init(shelf_file))
184
        self.nodes = Shortcuts(self.tloop, self.aio_obj.iserver.isession)
185
186
    def __str__(self):
187
        return "Sync" + self.aio_obj.__str__()
188
    __repr__ = __str__
189
190
    def __enter__(self):
191
        self.start()
192
        return self
193
194
    def __exit__(self, exc_type, exc_value, traceback):
195
        self.stop()
196
197
    def set_endpoint(self, url):
198
        return self.aio_obj.set_endpoint(url)
199
200
    def set_server_name(self, name):
201
        return self.aio_obj.set_server_name(name)
202
203
    def set_security_policy(self, security_policy):
204
        return self.aio_obj.set_security_policy(security_policy)
205
206
    def disable_clock(self, boolean):
207
        return self.aio_obj.disable_clock(boolean)
208
209
    @syncmethod
210
    def register_namespace(self, url):
211
        return self.aio_obj.register_namespace(url)
212
213
    @syncmethod
214
    def start(self):
215
        pass
216
217
    def stop(self):
218
        self.tloop.post(self.aio_obj.stop())
219
        if self.close_tloop:
220
            self.tloop.stop()
221
222
    def link_method(self, node, callback):
223
        return self.aio_obj.link_method(node, callback)
224
225
    @syncmethod
226
    def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
227
        pass
228
229
    def get_node(self, nodeid):
230
        return Node(self.tloop, self.aio_obj.get_node(nodeid))
231
232
    @syncmethod
233
    def import_xml(self, path=None, xmlstring=None):
234
        pass
235
236
    @syncmethod
237
    def get_namespace_index(self, url):
238
        pass
239
240
    @syncmethod
241
    def load_enums(self):
242
        pass
243
244
    @syncmethod
245
    def load_type_definitions(self):
246
        pass
247
248
    def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
249
        return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
250
251
252
class EventGenerator:
253
    def __init__(self, aio_evgen):
254
        self.aio_obj = aio_evgen
255
256
    @property
257
    def event(self):
258
        return self.aio_obj.event
259
260
    def trigger(self, time=None, message=None):
261
        return self.aio_obj.trigger(time, message)
262
263
264
class Node:
265
    def __init__(self, tloop, aio_node):
266
        self.aio_obj = aio_node
267
        self.tloop = tloop
268
269
    def __eq__(self, other):
270
        return self.aio_obj == other.aio_obj
271
272
    def __ne__(self, other):
273
        return not self.__eq__(other)
274
275
    def __str__(self):
276
        return "Sync" + self.aio_obj.__str__()
277
278
    __repr__ = __str__
279
280
    def __hash__(self):
281
        return self.aio_obj.__hash__()
282
283
    @property
284
    def nodeid(self):
285
        return self.aio_obj.nodeid
286
287
    @syncmethod
288
    def get_browse_name(self):
289
        pass
290
291
    @syncmethod
292
    def get_display_name(self):
293
        pass
294
295
    @syncmethod
296
    def get_children(
297
        self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified
298
    ):
299
        pass
300
301
    @syncmethod
302
    def get_children_descriptions(
303
        self,
304
        refs=ua.ObjectIds.HierarchicalReferences,
305
        nodeclassmask=ua.NodeClass.Unspecified,
306
        includesubtypes=True,
307
    ):
308
        pass
309
310
    @syncmethod
311
    def get_child(self, path):
312
        pass
313
314
    @syncmethod
315
    def set_modelling_rule(self, mandatory: bool):
316
        pass
317
318
    @syncmethod
319
    def add_variable(self, ns, name, val):
320
        pass
321
322
    @syncmethod
323
    def add_property(self, ns, name, val):
324
        pass
325
326
    @syncmethod
327
    def add_object(self, ns, name):
328
        pass
329
330
    @syncmethod
331
    def add_object_type(self, ns, name):
332
        pass
333
334
    @syncmethod
335
    def add_folder(self, ns, name):
336
        pass
337
338
    @syncmethod
339
    def add_method(self, *args):
340
        pass
341
342
    @syncmethod
343
    def set_writable(self, writable=True):
344
        pass
345
346
    @syncmethod
347
    def write_value(self, val):
348
        pass
349
350
    set_value = write_value  # legacy
351
352
    @syncmethod
353
    def read_value(self):
354
        pass
355
356
    get_value = read_value  # legacy
357
358
    @syncmethod
359
    def call_method(self, methodid, *args):
360
        pass
361
362
    @syncmethod
363
    def get_references(
364
        self,
365
        refs=ua.ObjectIds.References,
366
        direction=ua.BrowseDirection.Both,
367
        nodeclassmask=ua.NodeClass.Unspecified,
368
        includesubtypes=True,
369
    ):
370
        pass
371
372
    @syncmethod
373
    def get_description(self):
374
        pass
375
376
    @syncmethod
377
    def get_variables(self):
378
        pass
379
380
    @syncmethod
381
    def get_path(self):
382
        pass
383
384
    @syncmethod
385
    def get_node_class(self):
386
        pass
387
388
    @syncmethod
389
    def get_attributes(self):
390
        pass
391
392
class Subscription:
393
    def __init__(self, tloop, sub):
394
        self.tloop = tloop
395
        self.aio_obj = sub
396
397
    @syncmethod
398
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
399
        pass
400
401
    @syncmethod
402
    def subscribe_events(
403
        self,
404
        sourcenode=ua.ObjectIds.Server,
405
        evtypes=ua.ObjectIds.BaseEventType,
406
        evfilter=None,
407
        queuesize=0,
408
    ):
409
        pass
410
411
    @syncmethod
412
    def unsubscribe(self, handle):
413
        pass
414
415
    @syncmethod
416
    async def create_monitored_items(self, monitored_items):
417
        pass
418
419
    @syncmethod
420
    def delete(self):
421
        pass
422